Change return value of shiftKmer from bitmap to genecode
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 1ed6f80..5be5f83 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class GeneCode {
@@ -86,6 +101,10 @@
         return -1;
     }
 
+    public static byte getBitMapFromGeneCode(byte t) {
+        return (byte) (1 << t);
+    }
+
     public static int countNumberOfBitSet(int i) {
         int c = 0;
         for (; i != 0; c++) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 8b9f9bd..c211ce7 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -12,6 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package edu.uci.ics.genomix.type;
 
 import java.io.DataInput;
@@ -46,13 +47,13 @@
 
     public KmerBytesWritable(int k, byte[] storage) {
         this.kmerlength = k;
-        if (k > 0){
+        if (k > 0) {
             this.size = KmerUtil.getByteNumFromK(kmerlength);
             this.bytes = storage;
-            if (this.bytes.length < size){
+            if (this.bytes.length < size) {
                 throw new ArrayIndexOutOfBoundsException("Storage is smaller than required space for kmerlength:k");
             }
-        }else{
+        } else {
             this.bytes = storage;
             this.size = 0;
         }
@@ -182,7 +183,7 @@
         int pos = ((kmerlength - 1) % 4) << 1;
         byte code = (byte) (c << pos);
         bytes[0] = (byte) (((bytes[0] >>> 2) & 0x3f) | code);
-        return (byte) (1 << output);
+        return output;
     }
 
     /**
@@ -215,7 +216,7 @@
             bytes[0] &= (1 << ((kmerlength % 4) << 1)) - 1;
         }
         bytes[size - 1] = (byte) ((bytes[size - 1] << 2) | c);
-        return (byte) (1 << output);
+        return output;
     }
 
     public void set(KmerBytesWritable newData) {
@@ -248,7 +249,7 @@
 
     @Override
     public int hashCode() {
-        return super.hashCode() * this.kmerlength;
+        return super.hashCode() * 31 + this.kmerlength;
     }
 
     @Override
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
index 59cc3b0..68dec30 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class KmerUtil {
@@ -19,7 +34,7 @@
     public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
         StringBuilder strKmer = new StringBuilder();
         int byteId = keyStart + keyLength - 1;
-        if (byteId < 0){
+        if (byteId < 0) {
             return empty;
         }
         byte currentbyte = keyData[byteId];
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index a66d408..fc63fd8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class VKmerBytesWritable extends KmerBytesWritable {
@@ -10,8 +25,8 @@
     public VKmerBytesWritable() {
         super();
     }
-    
-    public VKmerBytesWritable(int k, byte[] storage){
+
+    public VKmerBytesWritable(int k, byte[] storage) {
         super(k, storage);
     }
 
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
index ab1e633..9bd6acb 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class VKmerBytesWritableFactory {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
index ee806cc..7aa05f5 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
@@ -1,15 +1,29 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type.old;
 
 @Deprecated
 public class Kmer {
-    
+
     public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
 
     public final static class GENE_CODE {
 
         /**
-         * make sure this 4 ids equal to the sequence id of char in
-         * {@GENE_SYMBOL}
+         * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
          */
         public static final byte A = 0;
         public static final byte C = 1;
@@ -19,22 +33,22 @@
         public static byte getCodeFromSymbol(byte ch) {
             byte r = 0;
             switch (ch) {
-            case 'A':
-            case 'a':
-                r = A;
-                break;
-            case 'C':
-            case 'c':
-                r = C;
-                break;
-            case 'G':
-            case 'g':
-                r = G;
-                break;
-            case 'T':
-            case 't':
-                r = T;
-                break;
+                case 'A':
+                case 'a':
+                    r = A;
+                    break;
+                case 'C':
+                case 'c':
+                    r = C;
+                    break;
+                case 'G':
+                case 'g':
+                    r = G;
+                    break;
+                case 'T':
+                case 't':
+                    r = T;
+                    break;
             }
             return r;
         }
@@ -49,42 +63,44 @@
         public static byte getAdjBit(byte t) {
             byte r = 0;
             switch (t) {
-            case 'A':
-            case 'a':
-                r = 1 << A;
-                break;
-            case 'C':
-            case 'c':
-                r = 1 << C;
-                break;
-            case 'G':
-            case 'g':
-                r = 1 << G;
-                break;
-            case 'T':
-            case 't':
-                r = 1 << T;
-                break;
+                case 'A':
+                case 'a':
+                    r = 1 << A;
+                    break;
+                case 'C':
+                case 'c':
+                    r = 1 << C;
+                    break;
+                case 'G':
+                case 'g':
+                    r = 1 << G;
+                    break;
+                case 'T':
+                case 't':
+                    r = 1 << T;
+                    break;
             }
             return r;
         }
 
-        /** 
-         * It works for path merge. 
+        /**
+         * It works for path merge.
          * Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
-         * @param t the neighbor code in BitMap
-         * @return the genecode 
+         * 
+         * @param t
+         *            the neighbor code in BitMap
+         * @return the genecode
          */
         public static byte getGeneCodeFromBitMap(byte t) {
             switch (t) {
-            case 1 << A:
-                return A;
-            case 1 << C:
-                return C;
-            case 1 << G:
-                return G;
-            case 1 << T:
-                return T;
+                case 1 << A:
+                    return A;
+                case 1 << C:
+                    return C;
+                case 1 << G:
+                    return G;
+                case 1 << T:
+                    return T;
             }
             return -1;
         }
@@ -112,8 +128,7 @@
         }
     }
 
-    public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
-            int keyLength) {
+    public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
         StringBuilder strKmer = new StringBuilder();
         int byteId = keyStart + keyLength - 1;
         byte currentbyte = keyData[byteId];
@@ -277,8 +292,7 @@
         if (k % 4 != 0) {
             kmer[0] &= (1 << ((k % 4) << 1)) - 1;
         }
-        kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE
-                .getCodeFromSymbol(c));
+        kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
         return (byte) (1 << output);
     }
 
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
index dd76427..7a96d2f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
@@ -8,8 +8,7 @@
 import org.apache.hadoop.io.WritableComparator;
 
 @Deprecated
-public class KmerBytesWritable extends BinaryComparable implements
-        WritableComparable<BinaryComparable> {
+public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
     private static final int LENGTH_BYTES = 4;
     private static final byte[] EMPTY_BYTES = {};
     private byte size;
@@ -126,8 +125,7 @@
         }
 
         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2,
-                    s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+            return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
         }
     }
 
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
index 7e9d2f3..a4b1bec 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
@@ -22,16 +22,16 @@
     }
 
     /**
-     * Get last kmer from kmer-chain. 
+     * Get last kmer from kmer-chain.
      * e.g. kmerChain is AAGCTA, if k =5, it will
      * return AGCTA
+     * 
      * @param k
      * @param kInChain
      * @param kmerChain
      * @return LastKmer bytes array
      */
-    public static byte[] getLastKmerFromChain(int k, int kInChain,
-            byte[] kmerChain, int offset, int length) {
+    public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
         if (k > kInChain) {
             return null;
         }
@@ -66,8 +66,7 @@
      * @param kmerChain
      * @return FirstKmer bytes array
      */
-    public static byte[] getFirstKmerFromChain(int k, int kInChain,
-            byte[] kmerChain, int offset, int length) {
+    public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
         if (k > kInChain) {
             return null;
         }
@@ -94,9 +93,13 @@
      * Merge kmer with next neighbor in gene-code format.
      * The k of new kmer will increase by 1
      * e.g. AAGCT merge with A => AAGCTA
-     * @param k :input k of kmer
-     * @param kmer : input bytes of kmer
-     * @param nextCode: next neighbor in gene-code format
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param kmer
+     *            : input bytes of kmer
+     * @param nextCode
+     *            : next neighbor in gene-code format
      * @return the merged Kmer, this K of this Kmer is k+1
      */
     public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
@@ -120,9 +123,13 @@
      * Merge kmer with previous neighbor in gene-code format.
      * The k of new kmer will increase by 1
      * e.g. AAGCT merge with A => AAAGCT
-     * @param k :input k of kmer
-     * @param kmer : input bytes of kmer
-     * @param preCode: next neighbor in gene-code format
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param kmer
+     *            : input bytes of kmer
+     * @param preCode
+     *            : next neighbor in gene-code format
      * @return the merged Kmer,this K of this Kmer is k+1
      */
     public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
@@ -147,10 +154,15 @@
     /**
      * Merge two kmer to one kmer
      * e.g. ACTA + ACCGT => ACTAACCGT
-     * @param preK : previous k of kmer
-     * @param kmerPre : bytes array of previous kmer
-     * @param nextK : next k of kmer
-     * @param kmerNext : bytes array of next kmer
+     * 
+     * @param preK
+     *            : previous k of kmer
+     * @param kmerPre
+     *            : bytes array of previous kmer
+     * @param nextK
+     *            : next k of kmer
+     * @param kmerNext
+     *            : bytes array of next kmer
      * @return merged kmer, the new k is @preK + @nextK
      */
     public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
@@ -161,7 +173,7 @@
         for (; i <= lengthPre; i++) {
             mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
         }
-        if ( i > 1){
+        if (i > 1) {
             i--;
         }
         if (preK % 4 == 0) {
@@ -172,52 +184,58 @@
             int posNeedToMove = ((preK % 4) << 1);
             mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
             for (int j = 1; j < lengthNext; j++) {
-                mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext
-                        - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext + lengthNext
-                        - j - 1] << posNeedToMove));
+                mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
+                        + lengthNext - j - 1] << posNeedToMove));
             }
-            if ( (nextK % 4) * 2 + posNeedToMove > 8) {
+            if ((nextK % 4) * 2 + posNeedToMove > 8) {
                 mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
             }
         }
         return mergedKmer;
     }
-    
+
     /**
      * Safely shifted the kmer forward without change the input kmer
      * e.g. AGCGC shift with T => GCGCT
-     * @param k: kmer length
-     * @param kmer: input kmer
-     * @param afterCode: input genecode 
+     * 
+     * @param k
+     *            : kmer length
+     * @param kmer
+     *            : input kmer
+     * @param afterCode
+     *            : input genecode
      * @return new created kmer that shifted by afterCode, the K will not change
      */
-    public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode){
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+    public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
+        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
         Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
         return shifted;
     }
-    
+
     /**
      * Safely shifted the kmer backward without change the input kmer
      * e.g. AGCGC shift with T => TAGCG
-     * @param k: kmer length
-     * @param kmer: input kmer
-     * @param preCode: input genecode 
+     * 
+     * @param k
+     *            : kmer length
+     * @param kmer
+     *            : input kmer
+     * @param preCode
+     *            : input genecode
      * @return new created kmer that shifted by preCode, the K will not change
      */
-    public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode){
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+    public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
+        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
         Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
         return shifted;
     }
 
-    public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer,
-            int offset, int length) {
+    public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
         if (pos >= k) {
             return -1;
         }
         int posByte = pos / 4;
-        int shift = (pos  % 4) << 1;
+        int shift = (pos % 4) << 1;
         return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
     }
 }
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
index f21da91..faee509 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.example.kmer;
 
 import junit.framework.Assert;
@@ -33,7 +48,7 @@
         }
 
         byte out = kmer.shiftKmerWithNextChar(array[array.length - 1]);
-        Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+        Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
         Assert.assertEquals(kmer.toString(), "ATAGAAG");
     }
 
@@ -59,7 +74,7 @@
         }
 
         byte out = kmer.shiftKmerWithPreChar(array[array.length - 1]);
-        Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+        Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
         Assert.assertEquals(kmer.toString(), "GAATAGA");
     }
 
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
index 6611752..7c4c675 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.example.kmer;
 
 import org.junit.Assert;
@@ -31,49 +46,49 @@
         for (int i = 8; i > 0; i--) {
             lastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
-            lastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+            lastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
         }
         VKmerBytesWritable vlastKmer;
         for (int i = 8; i > 0; i--) {
             vlastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
-            vlastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+            vlastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
         }
     }
-    
+
     @Test
-    public void TestGetFirstKmer(){
+    public void TestGetFirstKmer() {
         KmerBytesWritable kmer = new KmerBytesWritable(9);
         kmer.setByRead(array, 0);
         Assert.assertEquals("AGCTGACCG", kmer.toString());
         KmerBytesWritable firstKmer;
         for (int i = 8; i > 0; i--) {
             firstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());
-            firstKmer = kmerFactory.getSubKmerFromChain(0,i,kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());   
+            Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
+            firstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
+            Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
         }
         VKmerBytesWritable vfirstKmer;
         for (int i = 8; i > 0; i--) {
             vfirstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());
+            Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
             vfirstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());   
+            Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
         }
     }
-    
-    @Test 
-    public void TestGetSubKmer(){
+
+    @Test
+    public void TestGetSubKmer() {
         KmerBytesWritable kmer = new KmerBytesWritable(9);
         kmer.setByRead(array, 0);
         Assert.assertEquals("AGCTGACCG", kmer.toString());
         VKmerBytesWritable subKmer;
-        for (int istart = 0; istart < kmer.getKmerLength()-1; istart++) {
-            for(int isize = 1; isize + istart <= kmer.getKmerLength(); isize ++){
+        for (int istart = 0; istart < kmer.getKmerLength() - 1; istart++) {
+            for (int isize = 1; isize + istart <= kmer.getKmerLength(); isize++) {
                 subKmer = kmerFactory.getSubKmerFromChain(istart, isize, kmer);
-                Assert.assertEquals("AGCTGACCG".substring(istart, istart+isize), subKmer.toString());  
+                Assert.assertEquals("AGCTGACCG".substring(istart, istart + isize), subKmer.toString());
             }
         }
     }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index d36be17..f3f4584 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -87,14 +87,14 @@
             output.collect(outputKmer, outputAdjList);
             /** middle kmer */
             for (int i = KMER_SIZE; i < array.length - 1; i++) {
-                pre = outputKmer.shiftKmerWithNextChar(array[i]);
+                pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[i]));
                 next = GeneCode.getAdjBit(array[i + 1]);
                 adj = GeneCode.mergePreNextAdj(pre, next);
                 outputAdjList.set(adj, count);
                 output.collect(outputKmer, outputAdjList);
             }
             /** last kmer */
-            pre = outputKmer.shiftKmerWithNextChar(array[array.length - 1]);
+            pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[array.length - 1]));
             next = 0;
             adj = GeneCode.mergePreNextAdj(pre, next);
             outputAdjList.set(adj, count);
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 7b0005b..44af11b 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -71,7 +71,8 @@
         SequenceFile.Reader reader = null;
         Path path = new Path(RESULT_PATH + "/part-00000");
         reader = new SequenceFile.Reader(dfs, path, conf); 
-        KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+//        KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        KmerBytesWritable key = new KmerBytesWritable(SIZE_KMER);
         KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
         File filePathTo = new File(TEST_SOURCE_DIR);
         BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
index 41e3e4b..ebf1282 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.data.std.accessors;
 
 import java.io.DataInput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
index c1e9804..34c29c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.accessors;

 

 import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
index 86ca296..8aaf380 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.accessors;

 

 import java.nio.ByteBuffer;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
index d1d0054..83b9d12 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.accessors;

 

 import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
index b4aa4c7..8febfa5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.primitive;

 

 import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d3de2ba..ed1b926 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow;

 

 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

@@ -9,22 +24,19 @@
 

 /**

  * used by precluster groupby

- * 

  */

-public class ConnectorPolicyAssignmentPolicy implements

-		IConnectorPolicyAssignmentPolicy {

-	private static final long serialVersionUID = 1L;

-	private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

-	private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {

+    private static final long serialVersionUID = 1L;

+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

 

-	@Override

-	public IConnectorPolicy getConnectorPolicyAssignment(

-			IConnectorDescriptor c, int nProducers, int nConsumers,

-			int[] fanouts) {

-		if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

-			return senderSideMaterializePolicy;

-		} else {

-			return pipeliningPolicy;

-		}

-	}

+    @Override

+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,

+            int[] fanouts) {

+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

+            return senderSideMaterializePolicy;

+        } else {

+            return pipeliningPolicy;

+        }

+    }

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 413c73b..405e109 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.dataflow;
 
 import java.io.DataOutput;
@@ -22,68 +37,65 @@
 @SuppressWarnings("deprecation")
 public class KMerSequenceWriterFactory implements ITupleWriterFactory {
 
-	private static final long serialVersionUID = 1L;
-	private ConfFactory confFactory;
-	private final int kmerlength;
+    private static final long serialVersionUID = 1L;
+    private ConfFactory confFactory;
+    private final int kmerlength;
 
-	public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
-		this.confFactory = new ConfFactory(conf);
-		this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-	}
+    public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+        this.confFactory = new ConfFactory(conf);
+        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+    }
 
-	public class TupleWriter implements ITupleWriter {
-		public TupleWriter(ConfFactory cf) {
-			this.cf = cf;
-		}
+    public class TupleWriter implements ITupleWriter {
+        public TupleWriter(ConfFactory cf) {
+            this.cf = cf;
+        }
 
-		ConfFactory cf;
-		Writer writer = null;
+        ConfFactory cf;
+        Writer writer = null;
 
-		KmerCountValue reEnterCount = new KmerCountValue();
-		KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+        KmerCountValue reEnterCount = new KmerCountValue();
+        KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
 
-		/**
-		 * assumption is that output never change source!
-		 */
-		@Override
-		public void write(DataOutput output, ITupleReference tuple)
-				throws HyracksDataException {
-			try {
-				byte[] kmer = tuple.getFieldData(0);
-				int keyStart = tuple.getFieldStart(0);
-				int keyLength = tuple.getFieldLength(0);
+        /**
+         * assumption is that output never change source!
+         */
+        @Override
+        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+            try {
+                byte[] kmer = tuple.getFieldData(0);
+                int keyStart = tuple.getFieldStart(0);
+                int keyLength = tuple.getFieldLength(0);
 
-				byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
-				byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
-				reEnterCount.set(bitmap, count);
-				reEnterKey.set(kmer, keyStart, keyLength);
-				writer.append(reEnterKey, reEnterCount);
-			} catch (IOException e) {
-				throw new HyracksDataException(e);
-			}
-		}
+                byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+                byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+                reEnterCount.set(bitmap, count);
+                reEnterKey.set(kmer, keyStart, keyLength);
+                writer.append(reEnterKey, reEnterCount);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
 
-		@Override
-		public void open(DataOutput output) throws HyracksDataException {
-			try {
-				writer = SequenceFile.createWriter(cf.getConf(),
-						(FSDataOutputStream) output, KmerBytesWritable.class,
-						KmerCountValue.class, CompressionType.NONE, null);
-			} catch (IOException e) {
-				throw new HyracksDataException(e);
-			}
-		}
+        @Override
+        public void open(DataOutput output) throws HyracksDataException {
+            try {
+                writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
+                        KmerCountValue.class, CompressionType.NONE, null);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
 
-		@Override
-		public void close(DataOutput output) throws HyracksDataException {
-			// TODO Auto-generated method stub
-		}
-	}
+        @Override
+        public void close(DataOutput output) throws HyracksDataException {
+            // TODO Auto-generated method stub
+        }
+    }
 
-	@Override
-	public ITupleWriter getTupleWriter(IHyracksTaskContext ctx)
-			throws HyracksDataException {
-		return new TupleWriter(confFactory);
-	}
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new TupleWriter(confFactory);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index 4ba38d0..cfa7262 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.dataflow;
 
 import java.io.DataOutput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 79ad195..4f5b4da 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow;

 

 import java.nio.ByteBuffer;

@@ -62,13 +77,13 @@
 

                 /** middle kmer */

                 for (int i = k; i < array.length - 1; i++) {

-                    pre = kmer.shiftKmerWithNextChar(array[i]);

+                    pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));

                     next = GeneCode.getAdjBit(array[i + 1]);

                     InsertToFrame(kmer, pre, next, writer);

                 }

 

                 /** last kmer */

-                pre = kmer.shiftKmerWithNextChar(array[array.length - 1]);

+                pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));

                 next = 0;

                 InsertToFrame(kmer, pre, next, writer);

 

@@ -80,12 +95,12 @@
                     InsertToFrame(kmer, pre, next, writer);

                     /** middle kmer */

                     for (int i = k; i < array.length - 1; i++) {

-                        next = kmer.shiftKmerWithPreChar(array[i]);

+                        next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));

                         pre = GeneCode.getAdjBit(array[i + 1]);

                         InsertToFrame(kmer, pre, next, writer);

                     }

                     /** last kmer */

-                    next = kmer.shiftKmerWithPreChar(array[array.length - 1]);

+                    next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));

                     pre = 0;

                     InsertToFrame(kmer, pre, next, writer);

                 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 62680ed..ea70fb0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow.aggregators;

 

 import java.io.DataOutput;

@@ -15,126 +30,110 @@
 

 /**

  * sum

- * 

  */

-public class DistributedMergeLmerAggregateFactory implements

-		IAggregatorDescriptorFactory {

-	private static final long serialVersionUID = 1L;

+public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {

+    private static final long serialVersionUID = 1L;

 

-	public DistributedMergeLmerAggregateFactory() {

-	}

+    public DistributedMergeLmerAggregateFactory() {

+    }

 

-	public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {

-		private static final int MAX = 127;

+    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {

+        private static final int MAX = 127;

 

-		@Override

-		public void reset() {

-		}

+        @Override

+        public void reset() {

+        }

 

-		@Override

-		public void close() {

-			// TODO Auto-generated method stub

+        @Override

+        public void close() {

+            // TODO Auto-generated method stub

 

-		}

+        }

 

-		@Override

-		public AggregateState createAggregateStates() {

-			return new AggregateState(new Object() {

-			});

-		}

+        @Override

+        public AggregateState createAggregateStates() {

+            return new AggregateState(new Object() {

+            });

+        }

 

-		protected byte getField(IFrameTupleAccessor accessor, int tIndex,

-				int fieldId) {

-			int tupleOffset = accessor.getTupleStartOffset(tIndex);

-			int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

-			int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

-			byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()

-					.array(), offset);

-			return data;

-		}

-		

-		@Override

-		public void init(ArrayTupleBuilder tupleBuilder,

-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-				throws HyracksDataException {

-			byte bitmap = getField(accessor, tIndex, 1);

-			byte count = getField(accessor, tIndex, 2);

+        protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {

+            int tupleOffset = accessor.getTupleStartOffset(tIndex);

+            int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

+            int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

+            byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

+            return data;

+        }

 

-			DataOutput fieldOutput = tupleBuilder.getDataOutput();

-			try {

-				fieldOutput.writeByte(bitmap);

-				tupleBuilder.addFieldEndOffset();

-				fieldOutput.writeByte(count);

-				tupleBuilder.addFieldEndOffset();

-			} catch (IOException e) {

-				throw new HyracksDataException(

-						"I/O exception when initializing the aggregator.");

-			}

-		}

+        @Override

+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)

+                throws HyracksDataException {

+            byte bitmap = getField(accessor, tIndex, 1);

+            byte count = getField(accessor, tIndex, 2);

 

-		@Override

-		public void aggregate(IFrameTupleAccessor accessor, int tIndex,

-				IFrameTupleAccessor stateAccessor, int stateTupleIndex,

-				AggregateState state) throws HyracksDataException {

-			byte bitmap = getField(accessor, tIndex, 1);

-			short count = getField(accessor, tIndex, 2);

+            DataOutput fieldOutput = tupleBuilder.getDataOutput();

+            try {

+                fieldOutput.writeByte(bitmap);

+                tupleBuilder.addFieldEndOffset();

+                fieldOutput.writeByte(count);

+                tupleBuilder.addFieldEndOffset();

+            } catch (IOException e) {

+                throw new HyracksDataException("I/O exception when initializing the aggregator.");

+            }

+        }

 

-			int statetupleOffset = stateAccessor

-					.getTupleStartOffset(stateTupleIndex);

-			int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,

-					1);

-			int countfieldStart = stateAccessor.getFieldStartOffset(

-					stateTupleIndex, 2);

-			int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()

-					+ bitfieldStart;

-			int countoffset = statetupleOffset

-					+ stateAccessor.getFieldSlotsLength() + countfieldStart;

+        @Override

+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

+                int stateTupleIndex, AggregateState state) throws HyracksDataException {

+            byte bitmap = getField(accessor, tIndex, 1);

+            short count = getField(accessor, tIndex, 2);

 

-			byte[] data = stateAccessor.getBuffer().array();

+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

+            int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);

+            int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;

+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;

 

-			bitmap |= data[bitoffset];

-			count += data[countoffset];

-			if (count >= MAX) {

-				count = (byte) MAX;

-			}

-			data[bitoffset] = bitmap;

-			data[countoffset] = (byte) count;

-		}

+            byte[] data = stateAccessor.getBuffer().array();

 

-		@Override

-		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-				throws HyracksDataException {

-			byte bitmap = getField(accessor, tIndex, 1);

-			byte count = getField(accessor, tIndex, 2);

-			DataOutput fieldOutput = tupleBuilder.getDataOutput();

-			try {

-				fieldOutput.writeByte(bitmap);

-				tupleBuilder.addFieldEndOffset();

-				fieldOutput.writeByte(count);

-				tupleBuilder.addFieldEndOffset();

-			} catch (IOException e) {

-				throw new HyracksDataException(

-						"I/O exception when writing aggregation to the output buffer.");

-			}

+            bitmap |= data[bitoffset];

+            count += data[countoffset];

+            if (count >= MAX) {

+                count = (byte) MAX;

+            }

+            data[bitoffset] = bitmap;

+            data[countoffset] = (byte) count;

+        }

 

-		}

+        @Override

+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                AggregateState state) throws HyracksDataException {

+            byte bitmap = getField(accessor, tIndex, 1);

+            byte count = getField(accessor, tIndex, 2);

+            DataOutput fieldOutput = tupleBuilder.getDataOutput();

+            try {

+                fieldOutput.writeByte(bitmap);

+                tupleBuilder.addFieldEndOffset();

+                fieldOutput.writeByte(count);

+                tupleBuilder.addFieldEndOffset();

+            } catch (IOException e) {

+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+            }

 

-		@Override

-		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-				throws HyracksDataException {

-			outputPartialResult(tupleBuilder, accessor, tIndex, state);

-		}

-		

-	}

+        }

 

-	@Override

-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

-			RecordDescriptor inRecordDescriptor,

-			RecordDescriptor outRecordDescriptor, int[] keyFields,

-			int[] keyFieldsInPartialResults) throws HyracksDataException {

-		return new DistributeAggregatorDescriptor();

-	}

+        @Override

+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                AggregateState state) throws HyracksDataException {

+            outputPartialResult(tupleBuilder, accessor, tIndex, state);

+        }

+

+    }

+

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new DistributeAggregatorDescriptor();

+    }

 

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
index 330f950..87c0207 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.dataflow.aggregators;
 
 import java.io.DataOutput;
@@ -11,106 +26,93 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 
 public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
-	private static final int MAX = 127;
+    private static final int MAX = 127;
 
-	@Override
-	public void reset() {
-	}
+    @Override
+    public void reset() {
+    }
 
-	@Override
-	public void close() {
-		// TODO Auto-generated method stub
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
 
-	}
+    }
 
-	@Override
-	public AggregateState createAggregateStates() {
-		return new AggregateState(new Object() {
-		});
-	}
+    @Override
+    public AggregateState createAggregateStates() {
+        return new AggregateState(new Object() {
+        });
+    }
 
-	protected byte getField(IFrameTupleAccessor accessor, int tIndex,
-			int fieldId) {
-		int tupleOffset = accessor.getTupleStartOffset(tIndex);
-		int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
-		int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
-		byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
-				.array(), offset);
-		return data;
-	}
+    protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+        int tupleOffset = accessor.getTupleStartOffset(tIndex);
+        int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+        int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+        byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+        return data;
+    }
 
-	@Override
-	public void init(ArrayTupleBuilder tupleBuilder,
-			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-			throws HyracksDataException {
-		byte bitmap = getField(accessor, tIndex, 1);
-		byte count = 1;
+    @Override
+    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+            throws HyracksDataException {
+        byte bitmap = getField(accessor, tIndex, 1);
+        byte count = 1;
 
-		DataOutput fieldOutput = tupleBuilder.getDataOutput();
-		try {
-			fieldOutput.writeByte(bitmap);
-			tupleBuilder.addFieldEndOffset();
-			fieldOutput.writeByte(count);
-			tupleBuilder.addFieldEndOffset();
-		} catch (IOException e) {
-			throw new HyracksDataException(
-					"I/O exception when initializing the aggregator.");
-		}
-	}
+        DataOutput fieldOutput = tupleBuilder.getDataOutput();
+        try {
+            fieldOutput.writeByte(bitmap);
+            tupleBuilder.addFieldEndOffset();
+            fieldOutput.writeByte(count);
+            tupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw new HyracksDataException("I/O exception when initializing the aggregator.");
+        }
+    }
 
-	@Override
-	public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-			IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-			AggregateState state) throws HyracksDataException {
-		byte bitmap = getField(accessor, tIndex, 1);
-		short count = 1;
+    @Override
+    public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+            int stateTupleIndex, AggregateState state) throws HyracksDataException {
+        byte bitmap = getField(accessor, tIndex, 1);
+        short count = 1;
 
-		int statetupleOffset = stateAccessor
-				.getTupleStartOffset(stateTupleIndex);
-		int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
-				1);
-		int countfieldStart = stateAccessor.getFieldStartOffset(
-				stateTupleIndex, 2);
-		int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
-				+ bitfieldStart;
-		int countoffset = statetupleOffset
-				+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+        int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+        int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+        int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
+        int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
+        int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
 
-		byte[] data = stateAccessor.getBuffer().array();
+        byte[] data = stateAccessor.getBuffer().array();
 
-		bitmap |= data[bitoffset];
-		count += data[countoffset];
-		if (count >= MAX) {
-			count = (byte) MAX;
-		}
-		data[bitoffset] = bitmap;
-		data[countoffset] = (byte) count;
-	}
+        bitmap |= data[bitoffset];
+        count += data[countoffset];
+        if (count >= MAX) {
+            count = (byte) MAX;
+        }
+        data[bitoffset] = bitmap;
+        data[countoffset] = (byte) count;
+    }
 
-	@Override
-	public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-			throws HyracksDataException {
-		byte bitmap = getField(accessor, tIndex, 1);
-		byte count = getField(accessor, tIndex, 2);
-		DataOutput fieldOutput = tupleBuilder.getDataOutput();
-		try {
-			fieldOutput.writeByte(bitmap);
-			tupleBuilder.addFieldEndOffset();
-			fieldOutput.writeByte(count);
-			tupleBuilder.addFieldEndOffset();
-		} catch (IOException e) {
-			throw new HyracksDataException(
-					"I/O exception when writing aggregation to the output buffer.");
-		}
+    @Override
+    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        byte bitmap = getField(accessor, tIndex, 1);
+        byte count = getField(accessor, tIndex, 2);
+        DataOutput fieldOutput = tupleBuilder.getDataOutput();
+        try {
+            fieldOutput.writeByte(bitmap);
+            tupleBuilder.addFieldEndOffset();
+            fieldOutput.writeByte(count);
+            tupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+        }
 
-	}
+    }
 
-	@Override
-	public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-			throws HyracksDataException {
-		outputPartialResult(tupleBuilder, accessor, tIndex, state);
-	}
+    @Override
+    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        outputPartialResult(tupleBuilder, accessor, tIndex, state);
+    }
 
 };
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 58ff8a2..b5eb70f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow.aggregators;

 

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

@@ -8,20 +23,18 @@
 

 /**

  * count

- * 

  */

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

-	private static final long serialVersionUID = 1L;

+    private static final long serialVersionUID = 1L;

 

-	public MergeKmerAggregateFactory() {

-	}

+    public MergeKmerAggregateFactory() {

+    }

 

-	@Override

-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

-			RecordDescriptor inRecordDescriptor,

-			RecordDescriptor outRecordDescriptor, int[] keyFields,

-			int[] keyFieldsInPartialResults) throws HyracksDataException {

-		return new LocalAggregatorDescriptor();

-	}

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new LocalAggregatorDescriptor();

+    }

 

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index 99d6c89..27d5e15 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.driver;
 
 import java.net.URL;
@@ -22,122 +37,112 @@
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class Driver {
-	public static enum Plan {
-		BUILD_DEBRUJIN_GRAPH, GRAPH_CLEANNING, CONTIGS_GENERATION,
-	}
+    public static enum Plan {
+        BUILD_DEBRUJIN_GRAPH,
+        GRAPH_CLEANNING,
+        CONTIGS_GENERATION,
+    }
 
-	private static final String IS_PROFILING = "genomix.driver.profiling";
-	private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
-	private static final String applicationName = GenomixJob.JOB_NAME;
-	private static final Log LOG = LogFactory.getLog(Driver.class);
-	private JobGen jobGen;
-	private boolean profiling;
+    private static final String IS_PROFILING = "genomix.driver.profiling";
+    private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private JobGen jobGen;
+    private boolean profiling;
 
-	private int numPartitionPerMachine;
+    private int numPartitionPerMachine;
 
-	private IHyracksClientConnection hcc;
-	private Scheduler scheduler;
+    private IHyracksClientConnection hcc;
+    private Scheduler scheduler;
 
-	public Driver(String ipAddress, int port, int numPartitionPerMachine)
-			throws HyracksException {
-		try {
-			hcc = new HyracksConnection(ipAddress, port);
-			scheduler = new Scheduler(hcc.getNodeControllerInfos());
-		} catch (Exception e) {
-			throw new HyracksException(e);
-		}
-		this.numPartitionPerMachine = numPartitionPerMachine;
-	}
+    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+        try {
+            hcc = new HyracksConnection(ipAddress, port);
+            scheduler = new Scheduler(hcc.getNodeControllerInfos());
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+        this.numPartitionPerMachine = numPartitionPerMachine;
+    }
 
-	public void runJob(GenomixJob job) throws HyracksException {
-		runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
-	}
+    public void runJob(GenomixJob job) throws HyracksException {
+        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+    }
 
-	public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
-			throws HyracksException {
-		/** add hadoop configurations */
-		URL hadoopCore = job.getClass().getClassLoader()
-				.getResource("core-site.xml");
-		job.addResource(hadoopCore);
-		URL hadoopMapRed = job.getClass().getClassLoader()
-				.getResource("mapred-site.xml");
-		job.addResource(hadoopMapRed);
-		URL hadoopHdfs = job.getClass().getClassLoader()
-				.getResource("hdfs-site.xml");
-		job.addResource(hadoopHdfs);
+    public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+        /** add hadoop configurations */
+        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+        job.addResource(hadoopCore);
+        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+        job.addResource(hadoopMapRed);
+        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+        job.addResource(hadoopHdfs);
 
-		LOG.info("job started");
-		long start = System.currentTimeMillis();
-		long end = start;
-		long time = 0;
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
 
-		this.profiling = profiling;
-		try {
-			Map<String, NodeControllerInfo> ncMap = hcc
-					.getNodeControllerInfos();
-			LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
-			switch (planChoice) {
-			case BUILD_DEBRUJIN_GRAPH:
-			default:
-				jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
-						numPartitionPerMachine);
-				break;
-			}
+        this.profiling = profiling;
+        try {
+            Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+            LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+            switch (planChoice) {
+                case BUILD_DEBRUJIN_GRAPH:
+                default:
+                    jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+            }
 
-			start = System.currentTimeMillis();
-			runCreate(jobGen);
-			end = System.currentTimeMillis();
-			time = end - start;
-			LOG.info("result writing finished " + time + "ms");
-			LOG.info("job finished");
-		} catch (Exception e) {
-			throw new HyracksException(e);
-		}
-	}
+            start = System.currentTimeMillis();
+            runCreate(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
 
-	private void runCreate(JobGen jobGen) throws Exception {
-		try {
-			JobSpecification createJob = jobGen.generateJob();
-			execute(createJob);
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw e;
-		}
-	}
+    private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification createJob = jobGen.generateJob();
+            execute(createJob);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
 
-	private void execute(JobSpecification job) throws Exception {
-		job.setUseConnectorPolicyForScheduling(false);
-		JobId jobId = hcc.startJob(
-				job,
-				profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
-						.noneOf(JobFlag.class));
-		hcc.waitForCompletion(jobId);
-	}
+    private void execute(JobSpecification job) throws Exception {
+        job.setUseConnectorPolicyForScheduling(false);
+        JobId jobId = hcc
+                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
 
-	public static void main(String[] args) throws Exception {
-		GenomixJob jobConf = new GenomixJob();
-		String[] otherArgs = new GenericOptionsParser(jobConf, args)
-				.getRemainingArgs();
-		if (otherArgs.length < 4) {
-			System.err.println("Need <serverIP> <port> <input> <output>");
-			System.exit(-1);
-		}
-		String ipAddress = otherArgs[0];
-		int port = Integer.parseInt(otherArgs[1]);
-		int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
-		boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
-		// FileInputFormat.setInputPaths(job, otherArgs[2]);
-		{
-			Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
-			jobConf.set("mapred.input.dir", path.toString());
+    public static void main(String[] args) throws Exception {
+        GenomixJob jobConf = new GenomixJob();
+        String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+        if (otherArgs.length < 4) {
+            System.err.println("Need <serverIP> <port> <input> <output>");
+            System.exit(-1);
+        }
+        String ipAddress = otherArgs[0];
+        int port = Integer.parseInt(otherArgs[1]);
+        int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+        boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+        // FileInputFormat.setInputPaths(job, otherArgs[2]);
+        {
+            Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+            jobConf.set("mapred.input.dir", path.toString());
 
-			Path outputDir = new Path(jobConf.getWorkingDirectory(),
-					otherArgs[3]);
-			jobConf.set("mapred.output.dir", outputDir.toString());
-		}
-		// FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
-		// FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
-		Driver driver = new Driver(ipAddress, port, numOfDuplicate);
-		driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
-	}
+            Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+            jobConf.set("mapred.output.dir", outputDir.toString());
+        }
+        // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+        Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+        driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 11786f3..be82477 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import java.io.IOException;
@@ -7,73 +22,73 @@
 
 public class GenomixJob extends JobConf {
 
-	public static final String JOB_NAME = "genomix";
+    public static final String JOB_NAME = "genomix";
 
-	/** Kmers length */
-	public static final String KMER_LENGTH = "genomix.kmer";
-	/** Frame Size */
-	public static final String FRAME_SIZE = "genomix.framesize";
-	/** Frame Limit, hyracks need */
-	public static final String FRAME_LIMIT = "genomix.framelimit";
-	/** Table Size, hyracks need */
-	public static final String TABLE_SIZE = "genomix.tablesize";
-	/** Groupby types */
-	public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
-	/** Graph outputformat */
-	public static final String OUTPUT_FORMAT = "genomix.graph.output";
-	/** Get reversed Kmer Sequence */
-	public static final String REVERSED_KMER = "genomix.kmer.reversed";
+    /** Kmers length */
+    public static final String KMER_LENGTH = "genomix.kmer";
+    /** Frame Size */
+    public static final String FRAME_SIZE = "genomix.framesize";
+    /** Frame Limit, hyracks need */
+    public static final String FRAME_LIMIT = "genomix.framelimit";
+    /** Table Size, hyracks need */
+    public static final String TABLE_SIZE = "genomix.tablesize";
+    /** Groupby types */
+    public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+    /** Graph outputformat */
+    public static final String OUTPUT_FORMAT = "genomix.graph.output";
+    /** Get reversed Kmer Sequence */
+    public static final String REVERSED_KMER = "genomix.kmer.reversed";
 
-	/** Configurations used by hybrid groupby function in graph build phrase */
-	public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
-	public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
-	public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
-	public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
-	public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+    /** Configurations used by hybrid groupby function in graph build phrase */
+    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
 
-	public static final int DEFAULT_KMER = 21;
-	public static final int DEFAULT_FRAME_SIZE = 32768;
-	public static final int DEFAULT_FRAME_LIMIT = 4096;
-	public static final int DEFAULT_TABLE_SIZE = 10485767;
-	public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
-	public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
-	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
-	public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
-	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+    public static final int DEFAULT_KMER = 21;
+    public static final int DEFAULT_FRAME_SIZE = 32768;
+    public static final int DEFAULT_FRAME_LIMIT = 4096;
+    public static final int DEFAULT_TABLE_SIZE = 10485767;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
 
-	public static final boolean DEFAULT_REVERSED = false;
+    public static final boolean DEFAULT_REVERSED = false;
 
-	public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
-	public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+    public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
+    public static final String DEFAULT_OUTPUT_FORMAT = "binary";
 
-	public GenomixJob() throws IOException {
-		super(new Configuration());
-	}
+    public GenomixJob() throws IOException {
+        super(new Configuration());
+    }
 
-	public GenomixJob(Configuration conf) throws IOException {
-		super(conf);
-	}
+    public GenomixJob(Configuration conf) throws IOException {
+        super(conf);
+    }
 
-	/**
-	 * Set the kmer length
-	 * 
-	 * @param the
-	 *            desired frame size
-	 */
-	final public void setKmerLength(int kmerlength) {
-		setInt(KMER_LENGTH, kmerlength);
-	}
+    /**
+     * Set the kmer length
+     * 
+     * @param the
+     *            desired frame size
+     */
+    final public void setKmerLength(int kmerlength) {
+        setInt(KMER_LENGTH, kmerlength);
+    }
 
-	final public void setFrameSize(int frameSize) {
-		setInt(FRAME_SIZE, frameSize);
-	}
+    final public void setFrameSize(int frameSize) {
+        setInt(FRAME_SIZE, frameSize);
+    }
 
-	final public void setFrameLimit(int frameLimit) {
-		setInt(FRAME_LIMIT, frameLimit);
-	}
+    final public void setFrameLimit(int frameLimit) {
+        setInt(FRAME_LIMIT, frameLimit);
+    }
 
-	final public void setTableSize(int tableSize) {
-		setInt(TABLE_SIZE, tableSize);
-	}
+    final public void setTableSize(int tableSize) {
+        setInt(TABLE_SIZE, tableSize);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
index 557da6b..b1f9a29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import java.util.UUID;
@@ -9,19 +24,18 @@
 
 public abstract class JobGen {
 
-	protected final Configuration conf;
-	protected final GenomixJob genomixJob;
-	protected String jobId = new UUID(System.currentTimeMillis(),
-			System.nanoTime()).toString();
+    protected final Configuration conf;
+    protected final GenomixJob genomixJob;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
 
-	public JobGen(GenomixJob job) {
-		this.conf = job;
-		this.genomixJob = job;
-		this.initJobConfiguration();
-	}
+    public JobGen(GenomixJob job) {
+        this.conf = job;
+        this.genomixJob = job;
+        this.initJobConfiguration();
+    }
 
-	protected abstract void initJobConfiguration();
+    protected abstract void initJobConfiguration();
 
-	public abstract JobSpecification generateJob() throws HyracksException;
+    public abstract JobSpecification generateJob() throws HyracksException;
 
 }
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 9462fb0..79b38e8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import java.util.Map;
@@ -47,292 +62,234 @@
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class JobGenBrujinGraph extends JobGen {
-	public enum GroupbyType {
-		EXTERNAL, PRECLUSTER, HYBRIDHASH,
-	}
+    public enum GroupbyType {
+        EXTERNAL,
+        PRECLUSTER,
+        HYBRIDHASH,
+    }
 
-	public enum OutputFormat {
-		TEXT, BINARY,
-	}
+    public enum OutputFormat {
+        TEXT,
+        BINARY,
+    }
 
-	JobConf job;
-	private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-	private Scheduler scheduler;
-	private String[] ncNodeNames;
+    JobConf job;
+    private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    private Scheduler scheduler;
+    private String[] ncNodeNames;
 
-	private int kmers;
-	private int frameLimits;
-	private int frameSize;
-	private int tableSize;
-	private GroupbyType groupbyType;
-	private OutputFormat outputFormat;
-	private boolean bGenerateReversedKmer;
+    private int kmers;
+    private int frameLimits;
+    private int frameSize;
+    private int tableSize;
+    private GroupbyType groupbyType;
+    private OutputFormat outputFormat;
+    private boolean bGenerateReversedKmer;
 
-	private AbstractOperatorDescriptor singleGrouper;
-	private IConnectorDescriptor connPartition;
-	private AbstractOperatorDescriptor crossGrouper;
-	private RecordDescriptor readOutputRec;
-	private RecordDescriptor combineOutputRec;
+    private AbstractOperatorDescriptor singleGrouper;
+    private IConnectorDescriptor connPartition;
+    private AbstractOperatorDescriptor crossGrouper;
+    private RecordDescriptor readOutputRec;
+    private RecordDescriptor combineOutputRec;
 
-	/** works for hybrid hashing */
-	private long inputSizeInRawRecords;
-	private long inputSizeInUniqueKeys;
-	private int recordSizeInBytes;
-	private int hashfuncStartLevel;
+    /** works for hybrid hashing */
+    private long inputSizeInRawRecords;
+    private long inputSizeInUniqueKeys;
+    private int recordSizeInBytes;
+    private int hashfuncStartLevel;
 
-	private void logDebug(String status) {
-		String names = "";
-		for (String str : ncNodeNames) {
-			names += str + " ";
-		}
-		LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
-	}
+    private void logDebug(String status) {
+        String names = "";
+        for (String str : ncNodeNames) {
+            names += str + " ";
+        }
+        LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
+    }
 
-	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
-			final Map<String, NodeControllerInfo> ncMap,
-			int numPartitionPerMachine) {
-		super(job);
-		this.scheduler = scheduler;
-		String[] nodes = new String[ncMap.size()];
-		ncMap.keySet().toArray(nodes);
-		ncNodeNames = new String[nodes.length * numPartitionPerMachine];
-		for (int i = 0; i < numPartitionPerMachine; i++) {
-			System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
-					nodes.length);
-		}
-		logDebug("initialize");
-	}
+    public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) {
+        super(job);
+        this.scheduler = scheduler;
+        String[] nodes = new String[ncMap.size()];
+        ncMap.keySet().toArray(nodes);
+        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+        for (int i = 0; i < numPartitionPerMachine; i++) {
+            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+        }
+        logDebug("initialize");
+    }
 
-	private ExternalGroupOperatorDescriptor newExternalGroupby(
-			JobSpecification jobSpec, int[] keyFields,
-			IAggregatorDescriptorFactory aggeragater) {
-		return new ExternalGroupOperatorDescriptor(
-				jobSpec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-						.of(KmerPointable.FACTORY) },
-				new KmerNormarlizedComputerFactory(),
-				aggeragater,
-				new DistributedMergeLmerAggregateFactory(),
-				combineOutputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-										.of(KmerPointable.FACTORY) }),
-						tableSize), true);
-	}
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+                combineOutputRec, new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields,
+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                        .of(KmerPointable.FACTORY) }), tableSize), true);
+    }
 
-	private HybridHashGroupOperatorDescriptor newHybridGroupby(
-			JobSpecification jobSpec, int[] keyFields,
-			long inputSizeInRawRecords, long inputSizeInUniqueKeys,
-			int recordSizeInBytes, int hashfuncStartLevel,
-			IAggregatorDescriptorFactory aggeragater)
-			throws HyracksDataException {
-		return new HybridHashGroupOperatorDescriptor(
-				jobSpec,
-				keyFields,
-				frameLimits,
-				inputSizeInRawRecords,
-				inputSizeInUniqueKeys,
-				recordSizeInBytes,
-				tableSize,
-				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-						.of(KmerPointable.FACTORY) },
-				new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() },
-				hashfuncStartLevel, new KmerNormarlizedComputerFactory(),
-				aggeragater, new DistributedMergeLmerAggregateFactory(),
-				combineOutputRec, true);
-	}
+    private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
+            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
+            IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
+        return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
+                inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
+                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+                combineOutputRec, true);
+    }
 
-	private void generateDescriptorbyType(JobSpecification jobSpec)
-			throws HyracksDataException {
-		int[] keyFields = new int[] { 0 }; // the id of grouped key
+    private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
 
-		switch (groupbyType) {
-		case EXTERNAL:
-			singleGrouper = newExternalGroupby(jobSpec, keyFields,
-					new MergeKmerAggregateFactory());
-			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
-					new KmerHashPartitioncomputerFactory());
-			crossGrouper = newExternalGroupby(jobSpec, keyFields,
-					new DistributedMergeLmerAggregateFactory());
-			break;
-		case PRECLUSTER:
-			singleGrouper = newExternalGroupby(jobSpec, keyFields,
-					new MergeKmerAggregateFactory());
-			connPartition = new MToNPartitioningMergingConnectorDescriptor(
-					jobSpec,
-					new KmerHashPartitioncomputerFactory(),
-					keyFields,
-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-							.of(KmerPointable.FACTORY) });
-			crossGrouper = new PreclusteredGroupOperatorDescriptor(
-					jobSpec,
-					keyFields,
-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-							.of(KmerPointable.FACTORY) },
-					new DistributedMergeLmerAggregateFactory(),
-					combineOutputRec);
-			break;
-		case HYBRIDHASH:
-		default:
-			singleGrouper = newHybridGroupby(jobSpec, keyFields,
-					inputSizeInRawRecords, inputSizeInUniqueKeys,
-					recordSizeInBytes, hashfuncStartLevel,
-					new MergeKmerAggregateFactory());
-			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
-					new KmerHashPartitioncomputerFactory());
+        switch (groupbyType) {
+            case EXTERNAL:
+                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+                crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
+                break;
+            case PRECLUSTER:
+                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+                connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                        new KmerHashPartitioncomputerFactory(), keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
+                crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                        new DistributedMergeLmerAggregateFactory(), combineOutputRec);
+                break;
+            case HYBRIDHASH:
+            default:
+                singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+                        recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
 
-			crossGrouper = newHybridGroupby(jobSpec, keyFields,
-					inputSizeInRawRecords, inputSizeInUniqueKeys,
-					recordSizeInBytes, hashfuncStartLevel,
-					new DistributedMergeLmerAggregateFactory());
-			break;
-		}
-	}
+                crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+                        recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
+                break;
+        }
+    }
 
-	public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
-			throws HyracksDataException {
-		try {
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
 
-			InputSplit[] splits = job.getInputFormat().getSplits(job,
-					ncNodeNames.length);
+            InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
 
-			LOG.info("HDFS read into " + splits.length + " splits");
-			String[] readSchedule = scheduler.getLocationConstraints(splits);
-			String log = "";
-			for (String schedule : readSchedule) {
-				log += schedule + " ";
-			}
-			LOG.info("HDFS read schedule " + log);
-			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
-					splits, readSchedule, new ReadsKeyValueParserFactory(kmers,
-							bGenerateReversedKmer));
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-	}
+            LOG.info("HDFS read into " + splits.length + " splits");
+            String[] readSchedule = scheduler.getLocationConstraints(splits);
+            String log = "";
+            for (String schedule : readSchedule) {
+                log += schedule + " ";
+            }
+            LOG.info("HDFS read schedule " + log);
+            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
+                    new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 
-	@Override
-	public JobSpecification generateJob() throws HyracksException {
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
 
-		JobSpecification jobSpec = new JobSpecification();
-		readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE });
-		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE,
-				ByteSerializerDeserializer.INSTANCE });
-		jobSpec.setFrameSize(frameSize);
+        JobSpecification jobSpec = new JobSpecification();
+        readOutputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
+        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+        jobSpec.setFrameSize(frameSize);
 
-		// File input
-		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+        // File input
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
 
-		logDebug("Read Operator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				readOperator, ncNodeNames);
+        logDebug("Read Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
 
-		generateDescriptorbyType(jobSpec);
-		logDebug("SingleGroupby Operator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				singleGrouper, ncNodeNames);
+        generateDescriptorbyType(jobSpec);
+        logDebug("SingleGroupby Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
 
-		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
 
-		logDebug("CrossGrouper Operator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				crossGrouper, ncNodeNames);
-		jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+        logDebug("CrossGrouper Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
+        jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
 
-		// Output
-		ITupleWriterFactory writer = null;
-		switch (outputFormat) {
-		case TEXT:
-			writer = new KMerTextWriterFactory(kmers);
-			break;
-		case BINARY:
-		default:
-			writer = new KMerSequenceWriterFactory(job);
-			break;
-		}
-		HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
-				jobSpec, job, writer);
+        // Output
+        ITupleWriterFactory writer = null;
+        switch (outputFormat) {
+            case TEXT:
+                writer = new KMerTextWriterFactory(kmers);
+                break;
+            case BINARY:
+            default:
+                writer = new KMerSequenceWriterFactory(job);
+                break;
+        }
+        HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
 
-		logDebug("WriteOperator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				writeOperator, ncNodeNames);
+        logDebug("WriteOperator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
 
-		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
-		jobSpec.addRoot(writeOperator);
+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
+        jobSpec.addRoot(writeOperator);
 
-		if (groupbyType == GroupbyType.PRECLUSTER) {
-			jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-		}
-		return jobSpec;
-	}
+        if (groupbyType == GroupbyType.PRECLUSTER) {
+            jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        }
+        return jobSpec;
+    }
 
-	@Override
-	protected void initJobConfiguration() {
+    @Override
+    protected void initJobConfiguration() {
 
-		kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-		if (kmers % 2 == 0){
-		    kmers--;
-		    conf.setInt(GenomixJob.KMER_LENGTH, kmers);
-		}
-		frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT,
-				GenomixJob.DEFAULT_FRAME_LIMIT);
-		tableSize = conf.getInt(GenomixJob.TABLE_SIZE,
-				GenomixJob.DEFAULT_TABLE_SIZE);
-		frameSize = conf.getInt(GenomixJob.FRAME_SIZE,
-				GenomixJob.DEFAULT_FRAME_SIZE);
-		inputSizeInRawRecords = conf.getLong(
-				GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
-		inputSizeInUniqueKeys = conf.getLong(
-				GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
-		recordSizeInBytes = conf.getInt(
-				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
-		hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
-		/** here read the different recordSize why ? */
-		recordSizeInBytes = conf.getInt(
-				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        if (kmers % 2 == 0) {
+            kmers--;
+            conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+        }
+        frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+        inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+        inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+        hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+        /** here read the different recordSize why ? */
+        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
 
-		bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER,
-				GenomixJob.DEFAULT_REVERSED);
+        bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
 
-		String type = conf.get(GenomixJob.GROUPBY_TYPE,
-				GenomixJob.DEFAULT_GROUPBY_TYPE);
-		if (type.equalsIgnoreCase("external")) {
-			groupbyType = GroupbyType.EXTERNAL;
-		} else if (type.equalsIgnoreCase("precluster")) {
-			groupbyType = GroupbyType.PRECLUSTER;
-		} else {
-			groupbyType = GroupbyType.HYBRIDHASH;
-		}
+        String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+        if (type.equalsIgnoreCase("external")) {
+            groupbyType = GroupbyType.EXTERNAL;
+        } else if (type.equalsIgnoreCase("precluster")) {
+            groupbyType = GroupbyType.PRECLUSTER;
+        } else {
+            groupbyType = GroupbyType.HYBRIDHASH;
+        }
 
-		String output = conf.get(GenomixJob.OUTPUT_FORMAT,
-				GenomixJob.DEFAULT_OUTPUT_FORMAT);
-		if (output.equalsIgnoreCase("text")) {
-			outputFormat = OutputFormat.TEXT;
-		} else {
-			outputFormat = OutputFormat.BINARY;
-		}
-		job = new JobConf(conf);
-		LOG.info("Genomix Graph Build Configuration");
-		LOG.info("Kmer:" + kmers);
-		LOG.info("Groupby type:" + type);
-		LOG.info("Output format:" + output);
-		LOG.info("Frame limit" + frameLimits);
-		LOG.info("Frame size" + frameSize);
-	}
+        String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+        if (output.equalsIgnoreCase("text")) {
+            outputFormat = OutputFormat.TEXT;
+        } else {
+            outputFormat = OutputFormat.BINARY;
+        }
+        job = new JobConf(conf);
+        LOG.info("Genomix Graph Build Configuration");
+        LOG.info("Kmer:" + kmers);
+        LOG.info("Groupby type:" + type);
+        LOG.info("Output format:" + output);
+        LOG.info("Frame limit" + frameLimits);
+        LOG.info("Frame size" + frameSize);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
index 39cc6fc..a88fa79 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import org.apache.hadoop.mapred.InputSplit;
@@ -21,6 +36,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
@@ -34,146 +50,122 @@
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class JobGenStatistic extends JobGen {
-	private int kmers;
-	private JobConf hadoopjob;
-	private RecordDescriptor readOutputRec;
-	private String[] ncNodeNames;
-	private Scheduler scheduler;
-	private RecordDescriptor combineOutputRec;
-	
+    private int kmers;
+    private JobConf hadoopjob;
+    private RecordDescriptor readOutputRec;
+    private String[] ncNodeNames;
+    private Scheduler scheduler;
+    private RecordDescriptor combineOutputRec;
 
-	public JobGenStatistic(GenomixJob job) {
-		super(job);
-		// TODO Auto-generated constructor stub
-	}
+    public JobGenStatistic(GenomixJob job) {
+        super(job);
+        // TODO Auto-generated constructor stub
+    }
 
-	@Override
-	protected void initJobConfiguration() {
-		// TODO Auto-generated method stub
-		kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-		hadoopjob = new JobConf(conf);
-		hadoopjob.setInputFormat(SequenceFileInputFormat.class);
-	}
+    @Override
+    protected void initJobConfiguration() {
+        // TODO Auto-generated method stub
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        hadoopjob = new JobConf(conf);
+        hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+    }
 
-	@Override
-	public JobSpecification generateJob() throws HyracksException {
-		// TODO Auto-generated method stub
-		int[] degreeFields = { 0 };
-		int[] countFields = { 1 };
-		JobSpecification jobSpec = new JobSpecification();
-		/** specify the record fields after read */
-		readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE,ByteSerializerDeserializer.INSTANCE });
-		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE });
-		/** the reader */
-		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				readOperator, ncNodeNames);
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+        int[] degreeFields = { 0, 1 }; // indegree, outdegree
+        int[] countFields = { 2 };
+        JobSpecification jobSpec = new JobSpecification();
+        /** specify the record fields after read */
+        readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+                ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        /** the reader */
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
 
-		/** the combiner aggregator */
-		AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(
-				jobSpec, degreeFields, readOperator);
-		AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(
-				jobSpec, countFields, readOperator);
+        /** the combiner aggregator */
+        AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
+        AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
 
-		/** the final aggregator */
-		AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(
-				jobSpec, degreeFields, degreeLocal);
-		AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(
-				jobSpec, countFields, countLocal);
-		
-		/** writer */
-		AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(
-				jobSpec, degreeFields, degreeMerger);
-		AbstractFileWriteOperatorDescriptor writeCount = connectWriter(
-				jobSpec, countFields, countMerger);
-		jobSpec.addRoot(writeDegree);
-		jobSpec.addRoot(writeCount);
-		return null;
-	}
+        /** the final aggregator */
+        AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
+        AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
 
-	private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
-			throws HyracksDataException {
-		try {
+        /** writer */
+        AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
+        AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
+        jobSpec.addRoot(writeDegree);
+        jobSpec.addRoot(writeCount);
+        return null;
+    }
 
-			InputSplit[] splits = hadoopjob.getInputFormat().getSplits(
-					hadoopjob, ncNodeNames.length);
+    private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
 
-			String[] readSchedule = scheduler.getLocationConstraints(splits);
-			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec,
-					hadoopjob, splits, readSchedule,
-					new StatReadsKeyValueParserFactory());
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-	}
+            InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
 
-	private ExternalGroupOperatorDescriptor newExternalGroupby(
-			JobSpecification jobSpec, int[] keyFields,
-			IAggregatorDescriptorFactory aggeragater) {
-		return new ExternalGroupOperatorDescriptor(jobSpec, keyFields,
-				GenomixJob.DEFAULT_FRAME_LIMIT, new IBinaryComparatorFactory[] {
-						new ByteComparatorFactory() }, null, aggeragater,
-				new StatSumAggregateFactory(),
-				combineOutputRec, new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(keyFields,
-								new IBinaryHashFunctionFactory[] {
-										new ByteComparatorFactory() }),
-						GenomixJob.DEFAULT_TABLE_SIZE), true);
-	}
+            String[] readSchedule = scheduler.getLocationConstraints(splits);
+            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
+                    new StatReadsKeyValueParserFactory());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 
-	private AbstractOperatorDescriptor connectLocalAggregateByField(
-			JobSpecification jobSpec, int[] fields,
-			HDFSReadOperatorDescriptor readOperator) {
-		AbstractOperatorDescriptor localAggregator = newExternalGroupby(
-				jobSpec, fields, new StatCountAggregateFactory());
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				localAggregator, ncNodeNames);
-		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
-		return localAggregator;
-	}
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
+                new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
+                aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                                new ByteComparatorFactory(), new ByteComparatorFactory() }),
+                        GenomixJob.DEFAULT_TABLE_SIZE), true);
+    }
 
-	private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec,
-			int[] fields, AbstractOperatorDescriptor localAggregator) {
-		AbstractOperatorDescriptor finalAggregator = newExternalGroupby(
-				jobSpec, fields, new StatSumAggregateFactory());
-		// only need one reducer
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				finalAggregator, ncNodeNames[fields[0] % ncNodeNames.length]);
-		IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(
-				jobSpec,
-				new ITuplePartitionComputerFactory(){
-					private static final long serialVersionUID = 1L;
-					@Override
-					public ITuplePartitionComputer createPartitioner() {
-						return new ITuplePartitionComputer(){
-							@Override
-							public int partition(IFrameTupleAccessor accessor,
-									int tIndex, int nParts)
-									throws HyracksDataException {
-								return 0;
-							}
-						};
-					}
-				},
-				fields, 
-				new IBinaryComparatorFactory[]{new ByteComparatorFactory()});
-		jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
-		return finalAggregator;
-	}
-	
-	private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int [] fields, AbstractOperatorDescriptor finalAggregator){
-		LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(
-				jobSpec, null);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				writeOperator, ncNodeNames[fields[0] % ncNodeNames.length]);
+    private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
+            HDFSReadOperatorDescriptor readOperator) {
+        AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
+                new StatCountAggregateFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+        return localAggregator;
+    }
 
-		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
-		return writeOperator;
-	}
+    private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
+            AbstractOperatorDescriptor localAggregator) {
+        AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
+        // only need one reducer
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
+                % ncNodeNames.length]);
+        IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                new ITuplePartitionComputerFactory() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITuplePartitionComputer createPartitioner() {
+                        return new ITuplePartitionComputer() {
+                            @Override
+                            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                                    throws HyracksDataException {
+                                return 0;
+                            }
+                        };
+                    }
+                }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
+        jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+        return finalAggregator;
+    }
+
+    private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
+            AbstractOperatorDescriptor finalAggregator) {
+        LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
+                % ncNodeNames.length]);
+
+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+        return writeOperator;
+    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
index b469be0..832966b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -7,34 +22,33 @@
 
 public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	@Override
-	public IBinaryComparator createBinaryComparator() {
-		return new IBinaryComparator(){
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        return new IBinaryComparator() {
 
-			@Override
-			public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
-					int l2) {
-				return b1[s1]-b2[s2];
-			}
-			
-		};
-	}
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return b1[s1] - b2[s2];
+            }
 
-	@Override
-	public IBinaryHashFunction createBinaryHashFunction() {
-		return new IBinaryHashFunction(){
+        };
+    }
 
-			@Override
-			public int hash(byte[] bytes, int offset, int length) {
-				return bytes[offset];
-			}
-			
-		};
-	}
+    @Override
+    public IBinaryHashFunction createBinaryHashFunction() {
+        return new IBinaryHashFunction() {
+
+            @Override
+            public int hash(byte[] bytes, int offset, int length) {
+                return bytes[offset];
+            }
+
+        };
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
index 49a7453..65303a8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import java.io.DataOutput;
@@ -13,109 +28,102 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class StatCountAggregateFactory implements
-		IAggregatorDescriptorFactory {
+public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public class CountAggregator implements IAggregatorDescriptor {
+    public class CountAggregator implements IAggregatorDescriptor {
+        private final int[] keyFields;
 
-		@Override
-		public AggregateState createAggregateStates() {
-			// TODO Auto-generated method stub
-			return null;
-		}
+        public CountAggregator(int[] keyFields) {
+            this.keyFields = keyFields;
+        }
 
-		@Override
-		public void init(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = 1;
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when initializing the aggregator.");
-			}
-		}
+        @Override
+        public AggregateState createAggregateStates() {
+            // TODO Auto-generated method stub
+            return null;
+        }
 
-		@Override
-		public void reset() {
-			// TODO Auto-generated method stub
+        @Override
+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                throws HyracksDataException {
+            int count = 1;
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when initializing the aggregator.");
+            }
+        }
 
-		}
+        @Override
+        public void reset() {
+            // TODO Auto-generated method stub
 
-		@Override
-		public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-				IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-				AggregateState state) throws HyracksDataException {
-			int count = 1;
+        }
 
-			int statetupleOffset = stateAccessor
-					.getTupleStartOffset(stateTupleIndex);
-			int countfieldStart = stateAccessor.getFieldStartOffset(
-					stateTupleIndex, 1);
-			int countoffset = statetupleOffset
-					+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+        @Override
+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                int stateTupleIndex, AggregateState state) throws HyracksDataException {
+            int count = 1;
 
-			byte[] data = stateAccessor.getBuffer().array();
-			count += IntegerSerializerDeserializer.getInt(data, countoffset);
-			IntegerSerializerDeserializer.putInt(count, data, countoffset);
-		}
+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
 
-		@Override
-		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when writing aggregation to the output buffer.");
-			}
+            byte[] data = stateAccessor.getBuffer().array();
+            count += IntegerSerializerDeserializer.getInt(data, countoffset);
+            IntegerSerializerDeserializer.putInt(count, data, countoffset);
+        }
 
-		}
+        @Override
+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+            }
 
-		protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
-			int tupleOffset = accessor.getTupleStartOffset(tIndex);
-			int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
-			int countoffset = tupleOffset + accessor.getFieldSlotsLength()
-					+ fieldStart;
-			byte[] data = accessor.getBuffer().array();
+        }
 
-			return IntegerSerializerDeserializer.getInt(data, countoffset);
-		}
+        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+            int tupleOffset = accessor.getTupleStartOffset(tIndex);
+            int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
+            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+            byte[] data = accessor.getBuffer().array();
 
-		@Override
-		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			outputPartialResult(tupleBuilder, accessor, tIndex, state);
-		}
+            return IntegerSerializerDeserializer.getInt(data, countoffset);
+        }
 
-		@Override
-		public void close() {
-			// TODO Auto-generated method stub
+        @Override
+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            outputPartialResult(tupleBuilder, accessor, tIndex, state);
+        }
 
-		}
+        @Override
+        public void close() {
+            // TODO Auto-generated method stub
 
-	}
+        }
 
-	@Override
-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor, int[] keyFields,
-			int[] keyFieldsInPartialResults) throws HyracksDataException {
-		// TODO Auto-generated method stub
-		return new CountAggregator();
-	}
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new CountAggregator(keyFields);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
index 561d64a..2fcca67 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
@@ -1,13 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.io.BytesWritable;
-
 import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
 import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.genomix.type.KmerUtil;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -17,70 +30,65 @@
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
 
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<BytesWritable,KmerCountValue> {
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	@Override
-	public IKeyValueParser<BytesWritable,KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
-			throws HyracksDataException {
-		
-		final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
-		final ByteBuffer outputBuffer = ctx.allocateFrame();
-		final FrameTupleAppender outputAppender = new FrameTupleAppender(
-				ctx.getFrameSize());
-		outputAppender.reset(outputBuffer, true);
-		
-		return new IKeyValueParser<BytesWritable,KmerCountValue>(){
+    @Override
+    public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+            throws HyracksDataException {
 
-			@Override
-			public void open(IFrameWriter writer) throws HyracksDataException {
-				// TODO Auto-generated method stub
-				
-			}
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputBuffer, true);
 
-			@Override
-			public void parse(BytesWritable key, KmerCountValue value,
-					IFrameWriter writer) throws HyracksDataException {
-				byte adjMap = value.getAdjBitMap();
-				byte count = value.getCount();
-				InsertToFrame((byte) (GeneCode.inDegree(adjMap)*10+GeneCode.outDegree(adjMap)),count,writer);
-			}
+        return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
 
-			@Override
-			public void close(IFrameWriter writer) throws HyracksDataException {
-				FrameUtils.flushFrame(outputBuffer, writer);
-			}
-			
-			private void InsertToFrame(byte degree, byte count,
-					IFrameWriter writer) {
-				try {
-					tupleBuilder.reset();
-					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,degree);
-					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,count);
+            @Override
+            public void open(IFrameWriter writer) throws HyracksDataException {
+                // TODO Auto-generated method stub
 
-					if (!outputAppender.append(
-							tupleBuilder.getFieldEndOffsets(),
-							tupleBuilder.getByteArray(), 0,
-							tupleBuilder.getSize())) {
-						FrameUtils.flushFrame(outputBuffer, writer);
-						outputAppender.reset(outputBuffer, true);
-						if (!outputAppender.append(
-								tupleBuilder.getFieldEndOffsets(),
-								tupleBuilder.getByteArray(), 0,
-								tupleBuilder.getSize())) {
-							throw new IllegalStateException(
-									"Failed to copy an record into a frame: the record size is too large.");
-						}
-					}
-				} catch (Exception e) {
-					throw new IllegalStateException(e);
-				}
-			}
-		};
-	}
+            }
+
+            @Override
+            public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
+                    throws HyracksDataException {
+                byte adjMap = value.getAdjBitMap();
+                byte count = value.getCount();
+                InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+
+            private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
+                try {
+                    tupleBuilder.reset();
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
+
+                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputBuffer, writer);
+                        outputAppender.reset(outputBuffer, true);
+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                tupleBuilder.getSize())) {
+                            throw new IllegalStateException(
+                                    "Failed to copy an record into a frame: the record size is too large.");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        };
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
index ce00667..39ac60a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import java.io.DataOutput;
@@ -15,108 +30,102 @@
 
 public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public class DistributeAggregatorDescriptor implements
-			IAggregatorDescriptor {
+    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
 
-		@Override
-		public AggregateState createAggregateStates() {
-			// TODO Auto-generated method stub
-			return null;
-		}
+        private final int[] keyFields;
 
-		protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
-			int tupleOffset = accessor.getTupleStartOffset(tIndex);
-			int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
-			int countoffset = tupleOffset + accessor.getFieldSlotsLength()
-					+ fieldStart;
-			byte[] data = accessor.getBuffer().array();
-			return IntegerSerializerDeserializer.getInt(data, countoffset);
-		}
+        public DistributeAggregatorDescriptor(int[] keyFields) {
+            this.keyFields = keyFields;
+        }
 
-		@Override
-		public void init(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
+        @Override
+        public AggregateState createAggregateStates() {
+            // TODO Auto-generated method stub
+            return null;
+        }
 
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when initializing the aggregator.");
-			}
-		}
+        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+            int tupleOffset = accessor.getTupleStartOffset(tIndex);
+            int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+            byte[] data = accessor.getBuffer().array();
+            return IntegerSerializerDeserializer.getInt(data, countoffset);
+        }
 
-		@Override
-		public void reset() {
-			// TODO Auto-generated method stub
+        @Override
+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
 
-		}
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when initializing the aggregator.");
+            }
+        }
 
-		@Override
-		public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-				IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-				AggregateState state) throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
+        @Override
+        public void reset() {
+            // TODO Auto-generated method stub
 
-			int statetupleOffset = stateAccessor
-					.getTupleStartOffset(stateTupleIndex);
-			int countfieldStart = stateAccessor.getFieldStartOffset(
-					stateTupleIndex, 1);
-			int countoffset = statetupleOffset
-					+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+        }
 
-			byte[] data = stateAccessor.getBuffer().array();
-			count += IntegerSerializerDeserializer.getInt(data, countoffset);
-			IntegerSerializerDeserializer.putInt(count, data, countoffset);
-		}
+        @Override
+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                int stateTupleIndex, AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
 
-		@Override
-		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when writing aggregation to the output buffer.");
-			}
+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
 
-		}
+            byte[] data = stateAccessor.getBuffer().array();
+            count += IntegerSerializerDeserializer.getInt(data, countoffset);
+            IntegerSerializerDeserializer.putInt(count, data, countoffset);
+        }
 
-		@Override
-		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			outputPartialResult(tupleBuilder, accessor, tIndex, state);
+        @Override
+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+            }
 
-		}
+        }
 
-		@Override
-		public void close() {
-			// TODO Auto-generated method stub
+        @Override
+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            outputPartialResult(tupleBuilder, accessor, tIndex, state);
 
-		}
+        }
 
-	}
+        @Override
+        public void close() {
+            // TODO Auto-generated method stub
 
-	@Override
-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor, int[] keyFields,
-			int[] keyFieldsInPartialResults) throws HyracksDataException {
-		// TODO Auto-generated method stub
-		return new DistributeAggregatorDescriptor();
-	}
+        }
+
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new DistributeAggregatorDescriptor(keyFields);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index bef13b5..847272a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.example.jobrun;
 
 import java.io.BufferedWriter;
@@ -194,9 +209,10 @@
                     continue;
                 }
                 SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-                
-//                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER));
+
+                //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
+                        GenomixJob.DEFAULT_KMER));
                 KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
 
                 while (reader.next(key, value)) {
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
index 22688e0..aa1f791 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
@@ -24,39 +24,40 @@
 public class TestUtils {
     /**
      * Compare with the sorted expected file.
-     * The actual file may not be sorted; 
+     * The actual file may not be sorted;
+     * 
      * @param expectedFile
      * @param actualFile
      */
-    public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+    public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception {
         BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
         BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
         ArrayList<String> actualLines = new ArrayList<String>();
         String lineExpected, lineActual;
-        try{
-                while ( (lineActual = readerActual.readLine())!=null){
-                        actualLines.add(lineActual);
-                }
-                Collections.sort(actualLines);
-                int num = 1;
-                for(String actualLine : actualLines){
-                        lineExpected = readerExpected.readLine();
-                        if (lineExpected == null){
-                                throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
-                        }
-                        if ( !equalStrings(lineExpected, actualLine)){
-                                   throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
-                               + actualLine);
-                        }
-                ++num;
-                }
+        try {
+            while ((lineActual = readerActual.readLine()) != null) {
+                actualLines.add(lineActual);
+            }
+            Collections.sort(actualLines);
+            int num = 1;
+            for (String actualLine : actualLines) {
                 lineExpected = readerExpected.readLine();
-                if (lineExpected != null) {
-                    throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+                if (lineExpected == null) {
+                    throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
                 }
-        } finally{
-                readerActual.close();
-                readerExpected.close();
+                if (!equalStrings(lineExpected, actualLine)) {
+                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                            + actualLine);
+                }
+                ++num;
+            }
+            lineExpected = readerExpected.readLine();
+            if (lineExpected != null) {
+                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+            }
+        } finally {
+            readerActual.close();
+            readerExpected.close();
         }
     }