Merge branch 'fullstack_genomix' of https://code.google.com/p/hyracks into fullstack_genomix

Conflicts:
	genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 1ed6f80..5be5f83 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class GeneCode {
@@ -86,6 +101,10 @@
         return -1;
     }
 
+    public static byte getBitMapFromGeneCode(byte t) {
+        return (byte) (1 << t);
+    }
+
     public static int countNumberOfBitSet(int i) {
         int c = 0;
         for (; i != 0; c++) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 0171865..1f790eb 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -12,6 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package edu.uci.ics.genomix.type;
 
 import java.io.DataInput;
@@ -46,13 +47,13 @@
 
     public KmerBytesWritable(int k, byte[] storage) {
         this.kmerlength = k;
-        if (k > 0){
+        if (k > 0) {
             this.size = KmerUtil.getByteNumFromK(kmerlength);
             this.bytes = storage;
-            if (this.bytes.length < size){
+            if (this.bytes.length < size) {
                 throw new ArrayIndexOutOfBoundsException("Storage is smaller than required space for kmerlength:k");
             }
-        }else{
+        } else {
             this.bytes = storage;
             this.size = 0;
         }
@@ -182,7 +183,11 @@
         int pos = ((kmerlength - 1) % 4) << 1;
         byte code = (byte) (c << pos);
         bytes[0] = (byte) (((bytes[0] >>> 2) & 0x3f) | code);
+<<<<<<< HEAD
         return (byte) output;
+=======
+        return output;
+>>>>>>> 230f23a445dc7904a45bc535ebd73c3f4c619081
     }
 
     /**
@@ -215,7 +220,11 @@
             bytes[0] &= (1 << ((kmerlength % 4) << 1)) - 1;
         }
         bytes[size - 1] = (byte) ((bytes[size - 1] << 2) | c);
+<<<<<<< HEAD
         return (byte) output;
+=======
+        return output;
+>>>>>>> 230f23a445dc7904a45bc535ebd73c3f4c619081
     }
 
     public void set(KmerBytesWritable newData) {
@@ -248,7 +257,7 @@
 
     @Override
     public int hashCode() {
-        return super.hashCode() * this.kmerlength;
+        return super.hashCode() * 31 + this.kmerlength;
     }
 
     @Override
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
index 59cc3b0..68dec30 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class KmerUtil {
@@ -19,7 +34,7 @@
     public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
         StringBuilder strKmer = new StringBuilder();
         int byteId = keyStart + keyLength - 1;
-        if (byteId < 0){
+        if (byteId < 0) {
             return empty;
         }
         byte currentbyte = keyData[byteId];
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index a66d408..fc63fd8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class VKmerBytesWritable extends KmerBytesWritable {
@@ -10,8 +25,8 @@
     public VKmerBytesWritable() {
         super();
     }
-    
-    public VKmerBytesWritable(int k, byte[] storage){
+
+    public VKmerBytesWritable(int k, byte[] storage) {
         super(k, storage);
     }
 
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
index ab1e633..9bd6acb 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type;
 
 public class VKmerBytesWritableFactory {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
index ee806cc..7aa05f5 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
@@ -1,15 +1,29 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.type.old;
 
 @Deprecated
 public class Kmer {
-    
+
     public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
 
     public final static class GENE_CODE {
 
         /**
-         * make sure this 4 ids equal to the sequence id of char in
-         * {@GENE_SYMBOL}
+         * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
          */
         public static final byte A = 0;
         public static final byte C = 1;
@@ -19,22 +33,22 @@
         public static byte getCodeFromSymbol(byte ch) {
             byte r = 0;
             switch (ch) {
-            case 'A':
-            case 'a':
-                r = A;
-                break;
-            case 'C':
-            case 'c':
-                r = C;
-                break;
-            case 'G':
-            case 'g':
-                r = G;
-                break;
-            case 'T':
-            case 't':
-                r = T;
-                break;
+                case 'A':
+                case 'a':
+                    r = A;
+                    break;
+                case 'C':
+                case 'c':
+                    r = C;
+                    break;
+                case 'G':
+                case 'g':
+                    r = G;
+                    break;
+                case 'T':
+                case 't':
+                    r = T;
+                    break;
             }
             return r;
         }
@@ -49,42 +63,44 @@
         public static byte getAdjBit(byte t) {
             byte r = 0;
             switch (t) {
-            case 'A':
-            case 'a':
-                r = 1 << A;
-                break;
-            case 'C':
-            case 'c':
-                r = 1 << C;
-                break;
-            case 'G':
-            case 'g':
-                r = 1 << G;
-                break;
-            case 'T':
-            case 't':
-                r = 1 << T;
-                break;
+                case 'A':
+                case 'a':
+                    r = 1 << A;
+                    break;
+                case 'C':
+                case 'c':
+                    r = 1 << C;
+                    break;
+                case 'G':
+                case 'g':
+                    r = 1 << G;
+                    break;
+                case 'T':
+                case 't':
+                    r = 1 << T;
+                    break;
             }
             return r;
         }
 
-        /** 
-         * It works for path merge. 
+        /**
+         * It works for path merge.
          * Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
-         * @param t the neighbor code in BitMap
-         * @return the genecode 
+         * 
+         * @param t
+         *            the neighbor code in BitMap
+         * @return the genecode
          */
         public static byte getGeneCodeFromBitMap(byte t) {
             switch (t) {
-            case 1 << A:
-                return A;
-            case 1 << C:
-                return C;
-            case 1 << G:
-                return G;
-            case 1 << T:
-                return T;
+                case 1 << A:
+                    return A;
+                case 1 << C:
+                    return C;
+                case 1 << G:
+                    return G;
+                case 1 << T:
+                    return T;
             }
             return -1;
         }
@@ -112,8 +128,7 @@
         }
     }
 
-    public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
-            int keyLength) {
+    public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
         StringBuilder strKmer = new StringBuilder();
         int byteId = keyStart + keyLength - 1;
         byte currentbyte = keyData[byteId];
@@ -277,8 +292,7 @@
         if (k % 4 != 0) {
             kmer[0] &= (1 << ((k % 4) << 1)) - 1;
         }
-        kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE
-                .getCodeFromSymbol(c));
+        kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
         return (byte) (1 << output);
     }
 
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
index dd76427..7a96d2f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
@@ -8,8 +8,7 @@
 import org.apache.hadoop.io.WritableComparator;
 
 @Deprecated
-public class KmerBytesWritable extends BinaryComparable implements
-        WritableComparable<BinaryComparable> {
+public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
     private static final int LENGTH_BYTES = 4;
     private static final byte[] EMPTY_BYTES = {};
     private byte size;
@@ -126,8 +125,7 @@
         }
 
         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2,
-                    s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+            return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
         }
     }
 
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
index 7e9d2f3..a4b1bec 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
@@ -22,16 +22,16 @@
     }
 
     /**
-     * Get last kmer from kmer-chain. 
+     * Get last kmer from kmer-chain.
      * e.g. kmerChain is AAGCTA, if k =5, it will
      * return AGCTA
+     * 
      * @param k
      * @param kInChain
      * @param kmerChain
      * @return LastKmer bytes array
      */
-    public static byte[] getLastKmerFromChain(int k, int kInChain,
-            byte[] kmerChain, int offset, int length) {
+    public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
         if (k > kInChain) {
             return null;
         }
@@ -66,8 +66,7 @@
      * @param kmerChain
      * @return FirstKmer bytes array
      */
-    public static byte[] getFirstKmerFromChain(int k, int kInChain,
-            byte[] kmerChain, int offset, int length) {
+    public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
         if (k > kInChain) {
             return null;
         }
@@ -94,9 +93,13 @@
      * Merge kmer with next neighbor in gene-code format.
      * The k of new kmer will increase by 1
      * e.g. AAGCT merge with A => AAGCTA
-     * @param k :input k of kmer
-     * @param kmer : input bytes of kmer
-     * @param nextCode: next neighbor in gene-code format
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param kmer
+     *            : input bytes of kmer
+     * @param nextCode
+     *            : next neighbor in gene-code format
      * @return the merged Kmer, this K of this Kmer is k+1
      */
     public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
@@ -120,9 +123,13 @@
      * Merge kmer with previous neighbor in gene-code format.
      * The k of new kmer will increase by 1
      * e.g. AAGCT merge with A => AAAGCT
-     * @param k :input k of kmer
-     * @param kmer : input bytes of kmer
-     * @param preCode: next neighbor in gene-code format
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param kmer
+     *            : input bytes of kmer
+     * @param preCode
+     *            : next neighbor in gene-code format
      * @return the merged Kmer,this K of this Kmer is k+1
      */
     public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
@@ -147,10 +154,15 @@
     /**
      * Merge two kmer to one kmer
      * e.g. ACTA + ACCGT => ACTAACCGT
-     * @param preK : previous k of kmer
-     * @param kmerPre : bytes array of previous kmer
-     * @param nextK : next k of kmer
-     * @param kmerNext : bytes array of next kmer
+     * 
+     * @param preK
+     *            : previous k of kmer
+     * @param kmerPre
+     *            : bytes array of previous kmer
+     * @param nextK
+     *            : next k of kmer
+     * @param kmerNext
+     *            : bytes array of next kmer
      * @return merged kmer, the new k is @preK + @nextK
      */
     public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
@@ -161,7 +173,7 @@
         for (; i <= lengthPre; i++) {
             mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
         }
-        if ( i > 1){
+        if (i > 1) {
             i--;
         }
         if (preK % 4 == 0) {
@@ -172,52 +184,58 @@
             int posNeedToMove = ((preK % 4) << 1);
             mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
             for (int j = 1; j < lengthNext; j++) {
-                mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext
-                        - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext + lengthNext
-                        - j - 1] << posNeedToMove));
+                mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
+                        + lengthNext - j - 1] << posNeedToMove));
             }
-            if ( (nextK % 4) * 2 + posNeedToMove > 8) {
+            if ((nextK % 4) * 2 + posNeedToMove > 8) {
                 mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
             }
         }
         return mergedKmer;
     }
-    
+
     /**
      * Safely shifted the kmer forward without change the input kmer
      * e.g. AGCGC shift with T => GCGCT
-     * @param k: kmer length
-     * @param kmer: input kmer
-     * @param afterCode: input genecode 
+     * 
+     * @param k
+     *            : kmer length
+     * @param kmer
+     *            : input kmer
+     * @param afterCode
+     *            : input genecode
      * @return new created kmer that shifted by afterCode, the K will not change
      */
-    public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode){
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+    public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
+        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
         Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
         return shifted;
     }
-    
+
     /**
      * Safely shifted the kmer backward without change the input kmer
      * e.g. AGCGC shift with T => TAGCG
-     * @param k: kmer length
-     * @param kmer: input kmer
-     * @param preCode: input genecode 
+     * 
+     * @param k
+     *            : kmer length
+     * @param kmer
+     *            : input kmer
+     * @param preCode
+     *            : input genecode
      * @return new created kmer that shifted by preCode, the K will not change
      */
-    public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode){
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+    public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
+        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
         Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
         return shifted;
     }
 
-    public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer,
-            int offset, int length) {
+    public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
         if (pos >= k) {
             return -1;
         }
         int posByte = pos / 4;
-        int shift = (pos  % 4) << 1;
+        int shift = (pos % 4) << 1;
         return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
     }
 }
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
index f21da91..faee509 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.example.kmer;
 
 import junit.framework.Assert;
@@ -33,7 +48,7 @@
         }
 
         byte out = kmer.shiftKmerWithNextChar(array[array.length - 1]);
-        Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+        Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
         Assert.assertEquals(kmer.toString(), "ATAGAAG");
     }
 
@@ -59,7 +74,7 @@
         }
 
         byte out = kmer.shiftKmerWithPreChar(array[array.length - 1]);
-        Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+        Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
         Assert.assertEquals(kmer.toString(), "GAATAGA");
     }
 
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
index 6611752..7c4c675 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.example.kmer;
 
 import org.junit.Assert;
@@ -31,49 +46,49 @@
         for (int i = 8; i > 0; i--) {
             lastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
-            lastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+            lastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
         }
         VKmerBytesWritable vlastKmer;
         for (int i = 8; i > 0; i--) {
             vlastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
-            vlastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+            vlastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
         }
     }
-    
+
     @Test
-    public void TestGetFirstKmer(){
+    public void TestGetFirstKmer() {
         KmerBytesWritable kmer = new KmerBytesWritable(9);
         kmer.setByRead(array, 0);
         Assert.assertEquals("AGCTGACCG", kmer.toString());
         KmerBytesWritable firstKmer;
         for (int i = 8; i > 0; i--) {
             firstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());
-            firstKmer = kmerFactory.getSubKmerFromChain(0,i,kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());   
+            Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
+            firstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
+            Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
         }
         VKmerBytesWritable vfirstKmer;
         for (int i = 8; i > 0; i--) {
             vfirstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());
+            Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
             vfirstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
-            Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());   
+            Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
         }
     }
-    
-    @Test 
-    public void TestGetSubKmer(){
+
+    @Test
+    public void TestGetSubKmer() {
         KmerBytesWritable kmer = new KmerBytesWritable(9);
         kmer.setByRead(array, 0);
         Assert.assertEquals("AGCTGACCG", kmer.toString());
         VKmerBytesWritable subKmer;
-        for (int istart = 0; istart < kmer.getKmerLength()-1; istart++) {
-            for(int isize = 1; isize + istart <= kmer.getKmerLength(); isize ++){
+        for (int istart = 0; istart < kmer.getKmerLength() - 1; istart++) {
+            for (int isize = 1; isize + istart <= kmer.getKmerLength(); isize++) {
                 subKmer = kmerFactory.getSubKmerFromChain(istart, isize, kmer);
-                Assert.assertEquals("AGCTGACCG".substring(istart, istart+isize), subKmer.toString());  
+                Assert.assertEquals("AGCTGACCG".substring(istart, istart + isize), subKmer.toString());
             }
         }
     }
diff --git a/genomix/genomix-hadoop/actual3/conf.xml b/genomix/genomix-hadoop/actual3/conf.xml
index 32b17e1..9c045a4 100644
--- a/genomix/genomix-hadoop/actual3/conf.xml
+++ b/genomix/genomix-hadoop/actual3/conf.xml
@@ -12,7 +12,7 @@
 <property><name>dfs.namenode.logging.level</name><value>info</value></property>
 <property><name>dfs.datanode.address</name><value>127.0.0.1:0</value></property>
 <property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>fs.default.name</name><value>hdfs://localhost:55289</value></property>
+<property><name>fs.default.name</name><value>hdfs://localhost:55383</value></property>
 <property><name>mapred.child.tmp</name><value>./tmp</value></property>
 <property><name>fs.har.impl.disable.cache</name><value>true</value></property>
 <property><name>dfs.safemode.threshold.pct</name><value>0.999f</value></property>
@@ -125,7 +125,7 @@
 <property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
 <property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
 <property><name>dfs.namenode.decommission.interval</name><value>3</value></property>
-<property><name>dfs.http.address</name><value>localhost:55291</value></property>
+<property><name>dfs.http.address</name><value>localhost:55384</value></property>
 <property><name>dfs.heartbeat.interval</name><value>3</value></property>
 <property><name>mapred.job.tracker</name><value>local</value></property>
 <property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index d36be17..f3f4584 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -87,14 +87,14 @@
             output.collect(outputKmer, outputAdjList);
             /** middle kmer */
             for (int i = KMER_SIZE; i < array.length - 1; i++) {
-                pre = outputKmer.shiftKmerWithNextChar(array[i]);
+                pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[i]));
                 next = GeneCode.getAdjBit(array[i + 1]);
                 adj = GeneCode.mergePreNextAdj(pre, next);
                 outputAdjList.set(adj, count);
                 output.collect(outputKmer, outputAdjList);
             }
             /** last kmer */
-            pre = outputKmer.shiftKmerWithNextChar(array[array.length - 1]);
+            pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[array.length - 1]));
             next = 0;
             adj = GeneCode.mergePreNextAdj(pre, next);
             outputAdjList.set(adj, count);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java
index e7478a6..75ac0d1 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialMapper.java
@@ -91,44 +91,47 @@
         succeed = (byte) (succeed & adjBitMap);
         boolean inDegree = measureDegree(precursor);
         boolean outDegree = measureDegree(succeed);
-        
+        if (key.toString().equals("CGC")) {
+            int a = 2;
+            int b = a;
+        }
         if (inDegree == false && outDegree == false) {
+
             outputKmer.set(key);
             bitFlag = (byte) 2;
             outputAdjList.set(null, 0, 0, adjBitMap, bitFlag, 0);
             output.collect(outputKmer, outputAdjList);
-        }
-        else{
-            for(int i = 0 ; i < 4; i ++){
+        } else {
+            for (int i = 0; i < 4; i++) {
                 byte temp = 0x01;
                 byte shiftedCode = 0;
-                temp  = (byte)(temp << i);
-                temp = (byte) (succeed & temp);
-                if(temp != 0 ){
-                    byte succeedCode = GeneCode.getGeneCodeFromBitMap(temp);
-                    shiftedCode = key.shiftKmerWithNextCode(succeedCode);
-                    outputKmer.set(key);
-                    bitFlag = (byte)0x01;
-                    outputAdjList.set(null, 0, 0, (byte)0, bitFlag, 0);
-                    output.collect(outputKmer, outputAdjList);
-                    key.shiftKmerWithPreCode(shiftedCode);
-                }
-            }
-            for(int i = 0; i < 4;i ++){
-                byte temp = 0x01;
-                byte shiftedCode = 0;
-                temp = (byte)(temp << i);
-                temp = (byte)(precursor & temp);
-                if(temp != 0){
+                temp = (byte) (temp << i);
+                temp = (byte) (precursor & temp);
+                if (temp != 0) {
                     byte precurCode = GeneCode.getGeneCodeFromBitMap(temp);
                     shiftedCode = key.shiftKmerWithPreCode(precurCode);
                     outputKmer.set(key);
-                    bitFlag = (byte)0x80;
-                    outputAdjList.set(null, 0, 0, (byte)0, bitFlag, 0);
+                    bitFlag = (byte) 0x80;
+                    outputAdjList.set(null, 0, 0, (byte) 6, bitFlag, 0);
                     output.collect(outputKmer, outputAdjList);
                     key.shiftKmerWithNextCode(shiftedCode);
                 }
             }
+            for (int i = 0; i < 4; i++) {
+                byte temp = 0x01;
+                byte shiftedCode = 0;
+                temp = (byte) (temp << i);
+                temp = (byte) (succeed & temp);
+                if (temp != 0) {
+                    byte succeedCode = GeneCode.getGeneCodeFromBitMap(temp);
+                    shiftedCode = key.shiftKmerWithNextCode(succeedCode);
+                    outputKmer.set(key);
+                    bitFlag = (byte) 0x01;
+                    outputAdjList.set(null, 0, 0, (byte) 0, bitFlag, 0);
+                    output.collect(outputKmer, outputAdjList);
+                    key.shiftKmerWithPreCode(shiftedCode);
+                }
+            }
         }
     }
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java
index 91b13c9..3d2b58c 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/pathmergingh2/SNodeInitialReducer.java
@@ -25,6 +25,10 @@
         byte targetPointFlag = 0x00;
         byte targetAdjList = 0x00;
         byte outputFlag = 0x00;
+        if(key.toString().equals("TCG")){
+            int a = 2;
+            int b = a;
+        }
         if (values.hasNext() == true) {
             switch (outputValue.getFlag()) {
                 case (byte) 0x01:
@@ -48,14 +52,14 @@
                         endFlag = 0x01;
                         break;
                     case (byte) 0x02:
-                        targetPointFlag = 0x01;
+                        targetPointFlag = 0x02;
                         targetAdjList = outputValue.getAdjBitMap();
                         break;
                 }
                 if(startFlag != 0x00 && endFlag!= 0x00 && targetPointFlag != 0x00)
                     break;
             }
-            if(targetPointFlag != 0x02) {
+            if(targetPointFlag == 0x02) {
                 if(startFlag == 0x01) {
                     outputFlag = (byte) (outputFlag | startFlag);
                 }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 7b0005b..44af11b 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -71,7 +71,8 @@
         SequenceFile.Reader reader = null;
         Path path = new Path(RESULT_PATH + "/part-00000");
         reader = new SequenceFile.Reader(dfs, path, conf); 
-        KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+//        KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        KmerBytesWritable key = new KmerBytesWritable(SIZE_KMER);
         KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
         File filePathTo = new File(TEST_SOURCE_DIR);
         BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
index 41e3e4b..ebf1282 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.data.std.accessors;
 
 import java.io.DataInput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
index c1e9804..34c29c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.accessors;

 

 import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
index 86ca296..8aaf380 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.accessors;

 

 import java.nio.ByteBuffer;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
index d1d0054..83b9d12 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.accessors;

 

 import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
index b4aa4c7..8febfa5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.data.std.primitive;

 

 import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d3de2ba..ed1b926 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow;

 

 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

@@ -9,22 +24,19 @@
 

 /**

  * used by precluster groupby

- * 

  */

-public class ConnectorPolicyAssignmentPolicy implements

-		IConnectorPolicyAssignmentPolicy {

-	private static final long serialVersionUID = 1L;

-	private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

-	private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {

+    private static final long serialVersionUID = 1L;

+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

 

-	@Override

-	public IConnectorPolicy getConnectorPolicyAssignment(

-			IConnectorDescriptor c, int nProducers, int nConsumers,

-			int[] fanouts) {

-		if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

-			return senderSideMaterializePolicy;

-		} else {

-			return pipeliningPolicy;

-		}

-	}

+    @Override

+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,

+            int[] fanouts) {

+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

+            return senderSideMaterializePolicy;

+        } else {

+            return pipeliningPolicy;

+        }

+    }

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 413c73b..405e109 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.dataflow;
 
 import java.io.DataOutput;
@@ -22,68 +37,65 @@
 @SuppressWarnings("deprecation")
 public class KMerSequenceWriterFactory implements ITupleWriterFactory {
 
-	private static final long serialVersionUID = 1L;
-	private ConfFactory confFactory;
-	private final int kmerlength;
+    private static final long serialVersionUID = 1L;
+    private ConfFactory confFactory;
+    private final int kmerlength;
 
-	public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
-		this.confFactory = new ConfFactory(conf);
-		this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-	}
+    public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+        this.confFactory = new ConfFactory(conf);
+        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+    }
 
-	public class TupleWriter implements ITupleWriter {
-		public TupleWriter(ConfFactory cf) {
-			this.cf = cf;
-		}
+    public class TupleWriter implements ITupleWriter {
+        public TupleWriter(ConfFactory cf) {
+            this.cf = cf;
+        }
 
-		ConfFactory cf;
-		Writer writer = null;
+        ConfFactory cf;
+        Writer writer = null;
 
-		KmerCountValue reEnterCount = new KmerCountValue();
-		KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+        KmerCountValue reEnterCount = new KmerCountValue();
+        KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
 
-		/**
-		 * assumption is that output never change source!
-		 */
-		@Override
-		public void write(DataOutput output, ITupleReference tuple)
-				throws HyracksDataException {
-			try {
-				byte[] kmer = tuple.getFieldData(0);
-				int keyStart = tuple.getFieldStart(0);
-				int keyLength = tuple.getFieldLength(0);
+        /**
+         * assumption is that output never change source!
+         */
+        @Override
+        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+            try {
+                byte[] kmer = tuple.getFieldData(0);
+                int keyStart = tuple.getFieldStart(0);
+                int keyLength = tuple.getFieldLength(0);
 
-				byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
-				byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
-				reEnterCount.set(bitmap, count);
-				reEnterKey.set(kmer, keyStart, keyLength);
-				writer.append(reEnterKey, reEnterCount);
-			} catch (IOException e) {
-				throw new HyracksDataException(e);
-			}
-		}
+                byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+                byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+                reEnterCount.set(bitmap, count);
+                reEnterKey.set(kmer, keyStart, keyLength);
+                writer.append(reEnterKey, reEnterCount);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
 
-		@Override
-		public void open(DataOutput output) throws HyracksDataException {
-			try {
-				writer = SequenceFile.createWriter(cf.getConf(),
-						(FSDataOutputStream) output, KmerBytesWritable.class,
-						KmerCountValue.class, CompressionType.NONE, null);
-			} catch (IOException e) {
-				throw new HyracksDataException(e);
-			}
-		}
+        @Override
+        public void open(DataOutput output) throws HyracksDataException {
+            try {
+                writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
+                        KmerCountValue.class, CompressionType.NONE, null);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
 
-		@Override
-		public void close(DataOutput output) throws HyracksDataException {
-			// TODO Auto-generated method stub
-		}
-	}
+        @Override
+        public void close(DataOutput output) throws HyracksDataException {
+            // TODO Auto-generated method stub
+        }
+    }
 
-	@Override
-	public ITupleWriter getTupleWriter(IHyracksTaskContext ctx)
-			throws HyracksDataException {
-		return new TupleWriter(confFactory);
-	}
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new TupleWriter(confFactory);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index 4ba38d0..cfa7262 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.dataflow;
 
 import java.io.DataOutput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 79ad195..4f5b4da 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow;

 

 import java.nio.ByteBuffer;

@@ -62,13 +77,13 @@
 

                 /** middle kmer */

                 for (int i = k; i < array.length - 1; i++) {

-                    pre = kmer.shiftKmerWithNextChar(array[i]);

+                    pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));

                     next = GeneCode.getAdjBit(array[i + 1]);

                     InsertToFrame(kmer, pre, next, writer);

                 }

 

                 /** last kmer */

-                pre = kmer.shiftKmerWithNextChar(array[array.length - 1]);

+                pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));

                 next = 0;

                 InsertToFrame(kmer, pre, next, writer);

 

@@ -80,12 +95,12 @@
                     InsertToFrame(kmer, pre, next, writer);

                     /** middle kmer */

                     for (int i = k; i < array.length - 1; i++) {

-                        next = kmer.shiftKmerWithPreChar(array[i]);

+                        next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));

                         pre = GeneCode.getAdjBit(array[i + 1]);

                         InsertToFrame(kmer, pre, next, writer);

                     }

                     /** last kmer */

-                    next = kmer.shiftKmerWithPreChar(array[array.length - 1]);

+                    next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));

                     pre = 0;

                     InsertToFrame(kmer, pre, next, writer);

                 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 62680ed..ea70fb0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow.aggregators;

 

 import java.io.DataOutput;

@@ -15,126 +30,110 @@
 

 /**

  * sum

- * 

  */

-public class DistributedMergeLmerAggregateFactory implements

-		IAggregatorDescriptorFactory {

-	private static final long serialVersionUID = 1L;

+public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {

+    private static final long serialVersionUID = 1L;

 

-	public DistributedMergeLmerAggregateFactory() {

-	}

+    public DistributedMergeLmerAggregateFactory() {

+    }

 

-	public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {

-		private static final int MAX = 127;

+    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {

+        private static final int MAX = 127;

 

-		@Override

-		public void reset() {

-		}

+        @Override

+        public void reset() {

+        }

 

-		@Override

-		public void close() {

-			// TODO Auto-generated method stub

+        @Override

+        public void close() {

+            // TODO Auto-generated method stub

 

-		}

+        }

 

-		@Override

-		public AggregateState createAggregateStates() {

-			return new AggregateState(new Object() {

-			});

-		}

+        @Override

+        public AggregateState createAggregateStates() {

+            return new AggregateState(new Object() {

+            });

+        }

 

-		protected byte getField(IFrameTupleAccessor accessor, int tIndex,

-				int fieldId) {

-			int tupleOffset = accessor.getTupleStartOffset(tIndex);

-			int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

-			int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

-			byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()

-					.array(), offset);

-			return data;

-		}

-		

-		@Override

-		public void init(ArrayTupleBuilder tupleBuilder,

-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-				throws HyracksDataException {

-			byte bitmap = getField(accessor, tIndex, 1);

-			byte count = getField(accessor, tIndex, 2);

+        protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {

+            int tupleOffset = accessor.getTupleStartOffset(tIndex);

+            int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

+            int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

+            byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

+            return data;

+        }

 

-			DataOutput fieldOutput = tupleBuilder.getDataOutput();

-			try {

-				fieldOutput.writeByte(bitmap);

-				tupleBuilder.addFieldEndOffset();

-				fieldOutput.writeByte(count);

-				tupleBuilder.addFieldEndOffset();

-			} catch (IOException e) {

-				throw new HyracksDataException(

-						"I/O exception when initializing the aggregator.");

-			}

-		}

+        @Override

+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)

+                throws HyracksDataException {

+            byte bitmap = getField(accessor, tIndex, 1);

+            byte count = getField(accessor, tIndex, 2);

 

-		@Override

-		public void aggregate(IFrameTupleAccessor accessor, int tIndex,

-				IFrameTupleAccessor stateAccessor, int stateTupleIndex,

-				AggregateState state) throws HyracksDataException {

-			byte bitmap = getField(accessor, tIndex, 1);

-			short count = getField(accessor, tIndex, 2);

+            DataOutput fieldOutput = tupleBuilder.getDataOutput();

+            try {

+                fieldOutput.writeByte(bitmap);

+                tupleBuilder.addFieldEndOffset();

+                fieldOutput.writeByte(count);

+                tupleBuilder.addFieldEndOffset();

+            } catch (IOException e) {

+                throw new HyracksDataException("I/O exception when initializing the aggregator.");

+            }

+        }

 

-			int statetupleOffset = stateAccessor

-					.getTupleStartOffset(stateTupleIndex);

-			int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,

-					1);

-			int countfieldStart = stateAccessor.getFieldStartOffset(

-					stateTupleIndex, 2);

-			int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()

-					+ bitfieldStart;

-			int countoffset = statetupleOffset

-					+ stateAccessor.getFieldSlotsLength() + countfieldStart;

+        @Override

+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

+                int stateTupleIndex, AggregateState state) throws HyracksDataException {

+            byte bitmap = getField(accessor, tIndex, 1);

+            short count = getField(accessor, tIndex, 2);

 

-			byte[] data = stateAccessor.getBuffer().array();

+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

+            int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);

+            int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;

+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;

 

-			bitmap |= data[bitoffset];

-			count += data[countoffset];

-			if (count >= MAX) {

-				count = (byte) MAX;

-			}

-			data[bitoffset] = bitmap;

-			data[countoffset] = (byte) count;

-		}

+            byte[] data = stateAccessor.getBuffer().array();

 

-		@Override

-		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-				throws HyracksDataException {

-			byte bitmap = getField(accessor, tIndex, 1);

-			byte count = getField(accessor, tIndex, 2);

-			DataOutput fieldOutput = tupleBuilder.getDataOutput();

-			try {

-				fieldOutput.writeByte(bitmap);

-				tupleBuilder.addFieldEndOffset();

-				fieldOutput.writeByte(count);

-				tupleBuilder.addFieldEndOffset();

-			} catch (IOException e) {

-				throw new HyracksDataException(

-						"I/O exception when writing aggregation to the output buffer.");

-			}

+            bitmap |= data[bitoffset];

+            count += data[countoffset];

+            if (count >= MAX) {

+                count = (byte) MAX;

+            }

+            data[bitoffset] = bitmap;

+            data[countoffset] = (byte) count;

+        }

 

-		}

+        @Override

+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                AggregateState state) throws HyracksDataException {

+            byte bitmap = getField(accessor, tIndex, 1);

+            byte count = getField(accessor, tIndex, 2);

+            DataOutput fieldOutput = tupleBuilder.getDataOutput();

+            try {

+                fieldOutput.writeByte(bitmap);

+                tupleBuilder.addFieldEndOffset();

+                fieldOutput.writeByte(count);

+                tupleBuilder.addFieldEndOffset();

+            } catch (IOException e) {

+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+            }

 

-		@Override

-		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-				throws HyracksDataException {

-			outputPartialResult(tupleBuilder, accessor, tIndex, state);

-		}

-		

-	}

+        }

 

-	@Override

-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

-			RecordDescriptor inRecordDescriptor,

-			RecordDescriptor outRecordDescriptor, int[] keyFields,

-			int[] keyFieldsInPartialResults) throws HyracksDataException {

-		return new DistributeAggregatorDescriptor();

-	}

+        @Override

+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                AggregateState state) throws HyracksDataException {

+            outputPartialResult(tupleBuilder, accessor, tIndex, state);

+        }

+

+    }

+

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new DistributeAggregatorDescriptor();

+    }

 

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
index 330f950..87c0207 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.dataflow.aggregators;
 
 import java.io.DataOutput;
@@ -11,106 +26,93 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 
 public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
-	private static final int MAX = 127;
+    private static final int MAX = 127;
 
-	@Override
-	public void reset() {
-	}
+    @Override
+    public void reset() {
+    }
 
-	@Override
-	public void close() {
-		// TODO Auto-generated method stub
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
 
-	}
+    }
 
-	@Override
-	public AggregateState createAggregateStates() {
-		return new AggregateState(new Object() {
-		});
-	}
+    @Override
+    public AggregateState createAggregateStates() {
+        return new AggregateState(new Object() {
+        });
+    }
 
-	protected byte getField(IFrameTupleAccessor accessor, int tIndex,
-			int fieldId) {
-		int tupleOffset = accessor.getTupleStartOffset(tIndex);
-		int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
-		int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
-		byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
-				.array(), offset);
-		return data;
-	}
+    protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+        int tupleOffset = accessor.getTupleStartOffset(tIndex);
+        int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+        int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+        byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+        return data;
+    }
 
-	@Override
-	public void init(ArrayTupleBuilder tupleBuilder,
-			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-			throws HyracksDataException {
-		byte bitmap = getField(accessor, tIndex, 1);
-		byte count = 1;
+    @Override
+    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+            throws HyracksDataException {
+        byte bitmap = getField(accessor, tIndex, 1);
+        byte count = 1;
 
-		DataOutput fieldOutput = tupleBuilder.getDataOutput();
-		try {
-			fieldOutput.writeByte(bitmap);
-			tupleBuilder.addFieldEndOffset();
-			fieldOutput.writeByte(count);
-			tupleBuilder.addFieldEndOffset();
-		} catch (IOException e) {
-			throw new HyracksDataException(
-					"I/O exception when initializing the aggregator.");
-		}
-	}
+        DataOutput fieldOutput = tupleBuilder.getDataOutput();
+        try {
+            fieldOutput.writeByte(bitmap);
+            tupleBuilder.addFieldEndOffset();
+            fieldOutput.writeByte(count);
+            tupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw new HyracksDataException("I/O exception when initializing the aggregator.");
+        }
+    }
 
-	@Override
-	public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-			IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-			AggregateState state) throws HyracksDataException {
-		byte bitmap = getField(accessor, tIndex, 1);
-		short count = 1;
+    @Override
+    public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+            int stateTupleIndex, AggregateState state) throws HyracksDataException {
+        byte bitmap = getField(accessor, tIndex, 1);
+        short count = 1;
 
-		int statetupleOffset = stateAccessor
-				.getTupleStartOffset(stateTupleIndex);
-		int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
-				1);
-		int countfieldStart = stateAccessor.getFieldStartOffset(
-				stateTupleIndex, 2);
-		int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
-				+ bitfieldStart;
-		int countoffset = statetupleOffset
-				+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+        int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+        int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+        int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
+        int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
+        int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
 
-		byte[] data = stateAccessor.getBuffer().array();
+        byte[] data = stateAccessor.getBuffer().array();
 
-		bitmap |= data[bitoffset];
-		count += data[countoffset];
-		if (count >= MAX) {
-			count = (byte) MAX;
-		}
-		data[bitoffset] = bitmap;
-		data[countoffset] = (byte) count;
-	}
+        bitmap |= data[bitoffset];
+        count += data[countoffset];
+        if (count >= MAX) {
+            count = (byte) MAX;
+        }
+        data[bitoffset] = bitmap;
+        data[countoffset] = (byte) count;
+    }
 
-	@Override
-	public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-			throws HyracksDataException {
-		byte bitmap = getField(accessor, tIndex, 1);
-		byte count = getField(accessor, tIndex, 2);
-		DataOutput fieldOutput = tupleBuilder.getDataOutput();
-		try {
-			fieldOutput.writeByte(bitmap);
-			tupleBuilder.addFieldEndOffset();
-			fieldOutput.writeByte(count);
-			tupleBuilder.addFieldEndOffset();
-		} catch (IOException e) {
-			throw new HyracksDataException(
-					"I/O exception when writing aggregation to the output buffer.");
-		}
+    @Override
+    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        byte bitmap = getField(accessor, tIndex, 1);
+        byte count = getField(accessor, tIndex, 2);
+        DataOutput fieldOutput = tupleBuilder.getDataOutput();
+        try {
+            fieldOutput.writeByte(bitmap);
+            tupleBuilder.addFieldEndOffset();
+            fieldOutput.writeByte(count);
+            tupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+        }
 
-	}
+    }
 
-	@Override
-	public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-			throws HyracksDataException {
-		outputPartialResult(tupleBuilder, accessor, tIndex, state);
-	}
+    @Override
+    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        outputPartialResult(tupleBuilder, accessor, tIndex, state);
+    }
 
 };
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 58ff8a2..b5eb70f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

 package edu.uci.ics.genomix.dataflow.aggregators;

 

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

@@ -8,20 +23,18 @@
 

 /**

  * count

- * 

  */

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

-	private static final long serialVersionUID = 1L;

+    private static final long serialVersionUID = 1L;

 

-	public MergeKmerAggregateFactory() {

-	}

+    public MergeKmerAggregateFactory() {

+    }

 

-	@Override

-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

-			RecordDescriptor inRecordDescriptor,

-			RecordDescriptor outRecordDescriptor, int[] keyFields,

-			int[] keyFieldsInPartialResults) throws HyracksDataException {

-		return new LocalAggregatorDescriptor();

-	}

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new LocalAggregatorDescriptor();

+    }

 

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index 99d6c89..27d5e15 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.driver;
 
 import java.net.URL;
@@ -22,122 +37,112 @@
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class Driver {
-	public static enum Plan {
-		BUILD_DEBRUJIN_GRAPH, GRAPH_CLEANNING, CONTIGS_GENERATION,
-	}
+    public static enum Plan {
+        BUILD_DEBRUJIN_GRAPH,
+        GRAPH_CLEANNING,
+        CONTIGS_GENERATION,
+    }
 
-	private static final String IS_PROFILING = "genomix.driver.profiling";
-	private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
-	private static final String applicationName = GenomixJob.JOB_NAME;
-	private static final Log LOG = LogFactory.getLog(Driver.class);
-	private JobGen jobGen;
-	private boolean profiling;
+    private static final String IS_PROFILING = "genomix.driver.profiling";
+    private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private JobGen jobGen;
+    private boolean profiling;
 
-	private int numPartitionPerMachine;
+    private int numPartitionPerMachine;
 
-	private IHyracksClientConnection hcc;
-	private Scheduler scheduler;
+    private IHyracksClientConnection hcc;
+    private Scheduler scheduler;
 
-	public Driver(String ipAddress, int port, int numPartitionPerMachine)
-			throws HyracksException {
-		try {
-			hcc = new HyracksConnection(ipAddress, port);
-			scheduler = new Scheduler(hcc.getNodeControllerInfos());
-		} catch (Exception e) {
-			throw new HyracksException(e);
-		}
-		this.numPartitionPerMachine = numPartitionPerMachine;
-	}
+    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+        try {
+            hcc = new HyracksConnection(ipAddress, port);
+            scheduler = new Scheduler(hcc.getNodeControllerInfos());
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+        this.numPartitionPerMachine = numPartitionPerMachine;
+    }
 
-	public void runJob(GenomixJob job) throws HyracksException {
-		runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
-	}
+    public void runJob(GenomixJob job) throws HyracksException {
+        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+    }
 
-	public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
-			throws HyracksException {
-		/** add hadoop configurations */
-		URL hadoopCore = job.getClass().getClassLoader()
-				.getResource("core-site.xml");
-		job.addResource(hadoopCore);
-		URL hadoopMapRed = job.getClass().getClassLoader()
-				.getResource("mapred-site.xml");
-		job.addResource(hadoopMapRed);
-		URL hadoopHdfs = job.getClass().getClassLoader()
-				.getResource("hdfs-site.xml");
-		job.addResource(hadoopHdfs);
+    public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+        /** add hadoop configurations */
+        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+        job.addResource(hadoopCore);
+        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+        job.addResource(hadoopMapRed);
+        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+        job.addResource(hadoopHdfs);
 
-		LOG.info("job started");
-		long start = System.currentTimeMillis();
-		long end = start;
-		long time = 0;
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
 
-		this.profiling = profiling;
-		try {
-			Map<String, NodeControllerInfo> ncMap = hcc
-					.getNodeControllerInfos();
-			LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
-			switch (planChoice) {
-			case BUILD_DEBRUJIN_GRAPH:
-			default:
-				jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
-						numPartitionPerMachine);
-				break;
-			}
+        this.profiling = profiling;
+        try {
+            Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+            LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+            switch (planChoice) {
+                case BUILD_DEBRUJIN_GRAPH:
+                default:
+                    jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+            }
 
-			start = System.currentTimeMillis();
-			runCreate(jobGen);
-			end = System.currentTimeMillis();
-			time = end - start;
-			LOG.info("result writing finished " + time + "ms");
-			LOG.info("job finished");
-		} catch (Exception e) {
-			throw new HyracksException(e);
-		}
-	}
+            start = System.currentTimeMillis();
+            runCreate(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
 
-	private void runCreate(JobGen jobGen) throws Exception {
-		try {
-			JobSpecification createJob = jobGen.generateJob();
-			execute(createJob);
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw e;
-		}
-	}
+    private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification createJob = jobGen.generateJob();
+            execute(createJob);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
 
-	private void execute(JobSpecification job) throws Exception {
-		job.setUseConnectorPolicyForScheduling(false);
-		JobId jobId = hcc.startJob(
-				job,
-				profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
-						.noneOf(JobFlag.class));
-		hcc.waitForCompletion(jobId);
-	}
+    private void execute(JobSpecification job) throws Exception {
+        job.setUseConnectorPolicyForScheduling(false);
+        JobId jobId = hcc
+                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
 
-	public static void main(String[] args) throws Exception {
-		GenomixJob jobConf = new GenomixJob();
-		String[] otherArgs = new GenericOptionsParser(jobConf, args)
-				.getRemainingArgs();
-		if (otherArgs.length < 4) {
-			System.err.println("Need <serverIP> <port> <input> <output>");
-			System.exit(-1);
-		}
-		String ipAddress = otherArgs[0];
-		int port = Integer.parseInt(otherArgs[1]);
-		int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
-		boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
-		// FileInputFormat.setInputPaths(job, otherArgs[2]);
-		{
-			Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
-			jobConf.set("mapred.input.dir", path.toString());
+    public static void main(String[] args) throws Exception {
+        GenomixJob jobConf = new GenomixJob();
+        String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+        if (otherArgs.length < 4) {
+            System.err.println("Need <serverIP> <port> <input> <output>");
+            System.exit(-1);
+        }
+        String ipAddress = otherArgs[0];
+        int port = Integer.parseInt(otherArgs[1]);
+        int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+        boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+        // FileInputFormat.setInputPaths(job, otherArgs[2]);
+        {
+            Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+            jobConf.set("mapred.input.dir", path.toString());
 
-			Path outputDir = new Path(jobConf.getWorkingDirectory(),
-					otherArgs[3]);
-			jobConf.set("mapred.output.dir", outputDir.toString());
-		}
-		// FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
-		// FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
-		Driver driver = new Driver(ipAddress, port, numOfDuplicate);
-		driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
-	}
+            Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+            jobConf.set("mapred.output.dir", outputDir.toString());
+        }
+        // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+        Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+        driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 11786f3..be82477 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import java.io.IOException;
@@ -7,73 +22,73 @@
 
 public class GenomixJob extends JobConf {
 
-	public static final String JOB_NAME = "genomix";
+    public static final String JOB_NAME = "genomix";
 
-	/** Kmers length */
-	public static final String KMER_LENGTH = "genomix.kmer";
-	/** Frame Size */
-	public static final String FRAME_SIZE = "genomix.framesize";
-	/** Frame Limit, hyracks need */
-	public static final String FRAME_LIMIT = "genomix.framelimit";
-	/** Table Size, hyracks need */
-	public static final String TABLE_SIZE = "genomix.tablesize";
-	/** Groupby types */
-	public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
-	/** Graph outputformat */
-	public static final String OUTPUT_FORMAT = "genomix.graph.output";
-	/** Get reversed Kmer Sequence */
-	public static final String REVERSED_KMER = "genomix.kmer.reversed";
+    /** Kmers length */
+    public static final String KMER_LENGTH = "genomix.kmer";
+    /** Frame Size */
+    public static final String FRAME_SIZE = "genomix.framesize";
+    /** Frame Limit, hyracks need */
+    public static final String FRAME_LIMIT = "genomix.framelimit";
+    /** Table Size, hyracks need */
+    public static final String TABLE_SIZE = "genomix.tablesize";
+    /** Groupby types */
+    public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+    /** Graph outputformat */
+    public static final String OUTPUT_FORMAT = "genomix.graph.output";
+    /** Get reversed Kmer Sequence */
+    public static final String REVERSED_KMER = "genomix.kmer.reversed";
 
-	/** Configurations used by hybrid groupby function in graph build phrase */
-	public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
-	public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
-	public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
-	public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
-	public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+    /** Configurations used by hybrid groupby function in graph build phrase */
+    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
 
-	public static final int DEFAULT_KMER = 21;
-	public static final int DEFAULT_FRAME_SIZE = 32768;
-	public static final int DEFAULT_FRAME_LIMIT = 4096;
-	public static final int DEFAULT_TABLE_SIZE = 10485767;
-	public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
-	public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
-	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
-	public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
-	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+    public static final int DEFAULT_KMER = 21;
+    public static final int DEFAULT_FRAME_SIZE = 32768;
+    public static final int DEFAULT_FRAME_LIMIT = 4096;
+    public static final int DEFAULT_TABLE_SIZE = 10485767;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
 
-	public static final boolean DEFAULT_REVERSED = false;
+    public static final boolean DEFAULT_REVERSED = false;
 
-	public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
-	public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+    public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
+    public static final String DEFAULT_OUTPUT_FORMAT = "binary";
 
-	public GenomixJob() throws IOException {
-		super(new Configuration());
-	}
+    public GenomixJob() throws IOException {
+        super(new Configuration());
+    }
 
-	public GenomixJob(Configuration conf) throws IOException {
-		super(conf);
-	}
+    public GenomixJob(Configuration conf) throws IOException {
+        super(conf);
+    }
 
-	/**
-	 * Set the kmer length
-	 * 
-	 * @param the
-	 *            desired frame size
-	 */
-	final public void setKmerLength(int kmerlength) {
-		setInt(KMER_LENGTH, kmerlength);
-	}
+    /**
+     * Set the kmer length
+     * 
+     * @param the
+     *            desired frame size
+     */
+    final public void setKmerLength(int kmerlength) {
+        setInt(KMER_LENGTH, kmerlength);
+    }
 
-	final public void setFrameSize(int frameSize) {
-		setInt(FRAME_SIZE, frameSize);
-	}
+    final public void setFrameSize(int frameSize) {
+        setInt(FRAME_SIZE, frameSize);
+    }
 
-	final public void setFrameLimit(int frameLimit) {
-		setInt(FRAME_LIMIT, frameLimit);
-	}
+    final public void setFrameLimit(int frameLimit) {
+        setInt(FRAME_LIMIT, frameLimit);
+    }
 
-	final public void setTableSize(int tableSize) {
-		setInt(TABLE_SIZE, tableSize);
-	}
+    final public void setTableSize(int tableSize) {
+        setInt(TABLE_SIZE, tableSize);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
index 557da6b..b1f9a29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import java.util.UUID;
@@ -9,19 +24,18 @@
 
 public abstract class JobGen {
 
-	protected final Configuration conf;
-	protected final GenomixJob genomixJob;
-	protected String jobId = new UUID(System.currentTimeMillis(),
-			System.nanoTime()).toString();
+    protected final Configuration conf;
+    protected final GenomixJob genomixJob;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
 
-	public JobGen(GenomixJob job) {
-		this.conf = job;
-		this.genomixJob = job;
-		this.initJobConfiguration();
-	}
+    public JobGen(GenomixJob job) {
+        this.conf = job;
+        this.genomixJob = job;
+        this.initJobConfiguration();
+    }
 
-	protected abstract void initJobConfiguration();
+    protected abstract void initJobConfiguration();
 
-	public abstract JobSpecification generateJob() throws HyracksException;
+    public abstract JobSpecification generateJob() throws HyracksException;
 
 }
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 9462fb0..79b38e8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import java.util.Map;
@@ -47,292 +62,234 @@
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class JobGenBrujinGraph extends JobGen {
-	public enum GroupbyType {
-		EXTERNAL, PRECLUSTER, HYBRIDHASH,
-	}
+    public enum GroupbyType {
+        EXTERNAL,
+        PRECLUSTER,
+        HYBRIDHASH,
+    }
 
-	public enum OutputFormat {
-		TEXT, BINARY,
-	}
+    public enum OutputFormat {
+        TEXT,
+        BINARY,
+    }
 
-	JobConf job;
-	private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-	private Scheduler scheduler;
-	private String[] ncNodeNames;
+    JobConf job;
+    private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    private Scheduler scheduler;
+    private String[] ncNodeNames;
 
-	private int kmers;
-	private int frameLimits;
-	private int frameSize;
-	private int tableSize;
-	private GroupbyType groupbyType;
-	private OutputFormat outputFormat;
-	private boolean bGenerateReversedKmer;
+    private int kmers;
+    private int frameLimits;
+    private int frameSize;
+    private int tableSize;
+    private GroupbyType groupbyType;
+    private OutputFormat outputFormat;
+    private boolean bGenerateReversedKmer;
 
-	private AbstractOperatorDescriptor singleGrouper;
-	private IConnectorDescriptor connPartition;
-	private AbstractOperatorDescriptor crossGrouper;
-	private RecordDescriptor readOutputRec;
-	private RecordDescriptor combineOutputRec;
+    private AbstractOperatorDescriptor singleGrouper;
+    private IConnectorDescriptor connPartition;
+    private AbstractOperatorDescriptor crossGrouper;
+    private RecordDescriptor readOutputRec;
+    private RecordDescriptor combineOutputRec;
 
-	/** works for hybrid hashing */
-	private long inputSizeInRawRecords;
-	private long inputSizeInUniqueKeys;
-	private int recordSizeInBytes;
-	private int hashfuncStartLevel;
+    /** works for hybrid hashing */
+    private long inputSizeInRawRecords;
+    private long inputSizeInUniqueKeys;
+    private int recordSizeInBytes;
+    private int hashfuncStartLevel;
 
-	private void logDebug(String status) {
-		String names = "";
-		for (String str : ncNodeNames) {
-			names += str + " ";
-		}
-		LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
-	}
+    private void logDebug(String status) {
+        String names = "";
+        for (String str : ncNodeNames) {
+            names += str + " ";
+        }
+        LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
+    }
 
-	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
-			final Map<String, NodeControllerInfo> ncMap,
-			int numPartitionPerMachine) {
-		super(job);
-		this.scheduler = scheduler;
-		String[] nodes = new String[ncMap.size()];
-		ncMap.keySet().toArray(nodes);
-		ncNodeNames = new String[nodes.length * numPartitionPerMachine];
-		for (int i = 0; i < numPartitionPerMachine; i++) {
-			System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
-					nodes.length);
-		}
-		logDebug("initialize");
-	}
+    public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) {
+        super(job);
+        this.scheduler = scheduler;
+        String[] nodes = new String[ncMap.size()];
+        ncMap.keySet().toArray(nodes);
+        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+        for (int i = 0; i < numPartitionPerMachine; i++) {
+            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+        }
+        logDebug("initialize");
+    }
 
-	private ExternalGroupOperatorDescriptor newExternalGroupby(
-			JobSpecification jobSpec, int[] keyFields,
-			IAggregatorDescriptorFactory aggeragater) {
-		return new ExternalGroupOperatorDescriptor(
-				jobSpec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-						.of(KmerPointable.FACTORY) },
-				new KmerNormarlizedComputerFactory(),
-				aggeragater,
-				new DistributedMergeLmerAggregateFactory(),
-				combineOutputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-										.of(KmerPointable.FACTORY) }),
-						tableSize), true);
-	}
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+                combineOutputRec, new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields,
+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                        .of(KmerPointable.FACTORY) }), tableSize), true);
+    }
 
-	private HybridHashGroupOperatorDescriptor newHybridGroupby(
-			JobSpecification jobSpec, int[] keyFields,
-			long inputSizeInRawRecords, long inputSizeInUniqueKeys,
-			int recordSizeInBytes, int hashfuncStartLevel,
-			IAggregatorDescriptorFactory aggeragater)
-			throws HyracksDataException {
-		return new HybridHashGroupOperatorDescriptor(
-				jobSpec,
-				keyFields,
-				frameLimits,
-				inputSizeInRawRecords,
-				inputSizeInUniqueKeys,
-				recordSizeInBytes,
-				tableSize,
-				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-						.of(KmerPointable.FACTORY) },
-				new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() },
-				hashfuncStartLevel, new KmerNormarlizedComputerFactory(),
-				aggeragater, new DistributedMergeLmerAggregateFactory(),
-				combineOutputRec, true);
-	}
+    private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
+            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
+            IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
+        return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
+                inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
+                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+                combineOutputRec, true);
+    }
 
-	private void generateDescriptorbyType(JobSpecification jobSpec)
-			throws HyracksDataException {
-		int[] keyFields = new int[] { 0 }; // the id of grouped key
+    private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
 
-		switch (groupbyType) {
-		case EXTERNAL:
-			singleGrouper = newExternalGroupby(jobSpec, keyFields,
-					new MergeKmerAggregateFactory());
-			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
-					new KmerHashPartitioncomputerFactory());
-			crossGrouper = newExternalGroupby(jobSpec, keyFields,
-					new DistributedMergeLmerAggregateFactory());
-			break;
-		case PRECLUSTER:
-			singleGrouper = newExternalGroupby(jobSpec, keyFields,
-					new MergeKmerAggregateFactory());
-			connPartition = new MToNPartitioningMergingConnectorDescriptor(
-					jobSpec,
-					new KmerHashPartitioncomputerFactory(),
-					keyFields,
-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-							.of(KmerPointable.FACTORY) });
-			crossGrouper = new PreclusteredGroupOperatorDescriptor(
-					jobSpec,
-					keyFields,
-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-							.of(KmerPointable.FACTORY) },
-					new DistributedMergeLmerAggregateFactory(),
-					combineOutputRec);
-			break;
-		case HYBRIDHASH:
-		default:
-			singleGrouper = newHybridGroupby(jobSpec, keyFields,
-					inputSizeInRawRecords, inputSizeInUniqueKeys,
-					recordSizeInBytes, hashfuncStartLevel,
-					new MergeKmerAggregateFactory());
-			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
-					new KmerHashPartitioncomputerFactory());
+        switch (groupbyType) {
+            case EXTERNAL:
+                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+                crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
+                break;
+            case PRECLUSTER:
+                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+                connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                        new KmerHashPartitioncomputerFactory(), keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
+                crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                        new DistributedMergeLmerAggregateFactory(), combineOutputRec);
+                break;
+            case HYBRIDHASH:
+            default:
+                singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+                        recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
 
-			crossGrouper = newHybridGroupby(jobSpec, keyFields,
-					inputSizeInRawRecords, inputSizeInUniqueKeys,
-					recordSizeInBytes, hashfuncStartLevel,
-					new DistributedMergeLmerAggregateFactory());
-			break;
-		}
-	}
+                crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+                        recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
+                break;
+        }
+    }
 
-	public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
-			throws HyracksDataException {
-		try {
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
 
-			InputSplit[] splits = job.getInputFormat().getSplits(job,
-					ncNodeNames.length);
+            InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
 
-			LOG.info("HDFS read into " + splits.length + " splits");
-			String[] readSchedule = scheduler.getLocationConstraints(splits);
-			String log = "";
-			for (String schedule : readSchedule) {
-				log += schedule + " ";
-			}
-			LOG.info("HDFS read schedule " + log);
-			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
-					splits, readSchedule, new ReadsKeyValueParserFactory(kmers,
-							bGenerateReversedKmer));
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-	}
+            LOG.info("HDFS read into " + splits.length + " splits");
+            String[] readSchedule = scheduler.getLocationConstraints(splits);
+            String log = "";
+            for (String schedule : readSchedule) {
+                log += schedule + " ";
+            }
+            LOG.info("HDFS read schedule " + log);
+            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
+                    new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 
-	@Override
-	public JobSpecification generateJob() throws HyracksException {
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
 
-		JobSpecification jobSpec = new JobSpecification();
-		readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE });
-		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE,
-				ByteSerializerDeserializer.INSTANCE });
-		jobSpec.setFrameSize(frameSize);
+        JobSpecification jobSpec = new JobSpecification();
+        readOutputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
+        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+        jobSpec.setFrameSize(frameSize);
 
-		// File input
-		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+        // File input
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
 
-		logDebug("Read Operator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				readOperator, ncNodeNames);
+        logDebug("Read Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
 
-		generateDescriptorbyType(jobSpec);
-		logDebug("SingleGroupby Operator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				singleGrouper, ncNodeNames);
+        generateDescriptorbyType(jobSpec);
+        logDebug("SingleGroupby Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
 
-		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
 
-		logDebug("CrossGrouper Operator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				crossGrouper, ncNodeNames);
-		jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+        logDebug("CrossGrouper Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
+        jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
 
-		// Output
-		ITupleWriterFactory writer = null;
-		switch (outputFormat) {
-		case TEXT:
-			writer = new KMerTextWriterFactory(kmers);
-			break;
-		case BINARY:
-		default:
-			writer = new KMerSequenceWriterFactory(job);
-			break;
-		}
-		HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
-				jobSpec, job, writer);
+        // Output
+        ITupleWriterFactory writer = null;
+        switch (outputFormat) {
+            case TEXT:
+                writer = new KMerTextWriterFactory(kmers);
+                break;
+            case BINARY:
+            default:
+                writer = new KMerSequenceWriterFactory(job);
+                break;
+        }
+        HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
 
-		logDebug("WriteOperator");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				writeOperator, ncNodeNames);
+        logDebug("WriteOperator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
 
-		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
-		jobSpec.addRoot(writeOperator);
+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
+        jobSpec.addRoot(writeOperator);
 
-		if (groupbyType == GroupbyType.PRECLUSTER) {
-			jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-		}
-		return jobSpec;
-	}
+        if (groupbyType == GroupbyType.PRECLUSTER) {
+            jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        }
+        return jobSpec;
+    }
 
-	@Override
-	protected void initJobConfiguration() {
+    @Override
+    protected void initJobConfiguration() {
 
-		kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-		if (kmers % 2 == 0){
-		    kmers--;
-		    conf.setInt(GenomixJob.KMER_LENGTH, kmers);
-		}
-		frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT,
-				GenomixJob.DEFAULT_FRAME_LIMIT);
-		tableSize = conf.getInt(GenomixJob.TABLE_SIZE,
-				GenomixJob.DEFAULT_TABLE_SIZE);
-		frameSize = conf.getInt(GenomixJob.FRAME_SIZE,
-				GenomixJob.DEFAULT_FRAME_SIZE);
-		inputSizeInRawRecords = conf.getLong(
-				GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
-		inputSizeInUniqueKeys = conf.getLong(
-				GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
-		recordSizeInBytes = conf.getInt(
-				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
-		hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
-		/** here read the different recordSize why ? */
-		recordSizeInBytes = conf.getInt(
-				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
-				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        if (kmers % 2 == 0) {
+            kmers--;
+            conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+        }
+        frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+        inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+        inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+        hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+        /** here read the different recordSize why ? */
+        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
 
-		bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER,
-				GenomixJob.DEFAULT_REVERSED);
+        bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
 
-		String type = conf.get(GenomixJob.GROUPBY_TYPE,
-				GenomixJob.DEFAULT_GROUPBY_TYPE);
-		if (type.equalsIgnoreCase("external")) {
-			groupbyType = GroupbyType.EXTERNAL;
-		} else if (type.equalsIgnoreCase("precluster")) {
-			groupbyType = GroupbyType.PRECLUSTER;
-		} else {
-			groupbyType = GroupbyType.HYBRIDHASH;
-		}
+        String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+        if (type.equalsIgnoreCase("external")) {
+            groupbyType = GroupbyType.EXTERNAL;
+        } else if (type.equalsIgnoreCase("precluster")) {
+            groupbyType = GroupbyType.PRECLUSTER;
+        } else {
+            groupbyType = GroupbyType.HYBRIDHASH;
+        }
 
-		String output = conf.get(GenomixJob.OUTPUT_FORMAT,
-				GenomixJob.DEFAULT_OUTPUT_FORMAT);
-		if (output.equalsIgnoreCase("text")) {
-			outputFormat = OutputFormat.TEXT;
-		} else {
-			outputFormat = OutputFormat.BINARY;
-		}
-		job = new JobConf(conf);
-		LOG.info("Genomix Graph Build Configuration");
-		LOG.info("Kmer:" + kmers);
-		LOG.info("Groupby type:" + type);
-		LOG.info("Output format:" + output);
-		LOG.info("Frame limit" + frameLimits);
-		LOG.info("Frame size" + frameSize);
-	}
+        String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+        if (output.equalsIgnoreCase("text")) {
+            outputFormat = OutputFormat.TEXT;
+        } else {
+            outputFormat = OutputFormat.BINARY;
+        }
+        job = new JobConf(conf);
+        LOG.info("Genomix Graph Build Configuration");
+        LOG.info("Kmer:" + kmers);
+        LOG.info("Groupby type:" + type);
+        LOG.info("Output format:" + output);
+        LOG.info("Frame limit" + frameLimits);
+        LOG.info("Frame size" + frameSize);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
index 39cc6fc..a88fa79 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.job;
 
 import org.apache.hadoop.mapred.InputSplit;
@@ -21,6 +36,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
@@ -34,146 +50,122 @@
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class JobGenStatistic extends JobGen {
-	private int kmers;
-	private JobConf hadoopjob;
-	private RecordDescriptor readOutputRec;
-	private String[] ncNodeNames;
-	private Scheduler scheduler;
-	private RecordDescriptor combineOutputRec;
-	
+    private int kmers;
+    private JobConf hadoopjob;
+    private RecordDescriptor readOutputRec;
+    private String[] ncNodeNames;
+    private Scheduler scheduler;
+    private RecordDescriptor combineOutputRec;
 
-	public JobGenStatistic(GenomixJob job) {
-		super(job);
-		// TODO Auto-generated constructor stub
-	}
+    public JobGenStatistic(GenomixJob job) {
+        super(job);
+        // TODO Auto-generated constructor stub
+    }
 
-	@Override
-	protected void initJobConfiguration() {
-		// TODO Auto-generated method stub
-		kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-		hadoopjob = new JobConf(conf);
-		hadoopjob.setInputFormat(SequenceFileInputFormat.class);
-	}
+    @Override
+    protected void initJobConfiguration() {
+        // TODO Auto-generated method stub
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        hadoopjob = new JobConf(conf);
+        hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+    }
 
-	@Override
-	public JobSpecification generateJob() throws HyracksException {
-		// TODO Auto-generated method stub
-		int[] degreeFields = { 0 };
-		int[] countFields = { 1 };
-		JobSpecification jobSpec = new JobSpecification();
-		/** specify the record fields after read */
-		readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE,ByteSerializerDeserializer.INSTANCE });
-		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				null, ByteSerializerDeserializer.INSTANCE });
-		/** the reader */
-		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				readOperator, ncNodeNames);
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+        int[] degreeFields = { 0, 1 }; // indegree, outdegree
+        int[] countFields = { 2 };
+        JobSpecification jobSpec = new JobSpecification();
+        /** specify the record fields after read */
+        readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+                ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        /** the reader */
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
 
-		/** the combiner aggregator */
-		AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(
-				jobSpec, degreeFields, readOperator);
-		AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(
-				jobSpec, countFields, readOperator);
+        /** the combiner aggregator */
+        AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
+        AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
 
-		/** the final aggregator */
-		AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(
-				jobSpec, degreeFields, degreeLocal);
-		AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(
-				jobSpec, countFields, countLocal);
-		
-		/** writer */
-		AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(
-				jobSpec, degreeFields, degreeMerger);
-		AbstractFileWriteOperatorDescriptor writeCount = connectWriter(
-				jobSpec, countFields, countMerger);
-		jobSpec.addRoot(writeDegree);
-		jobSpec.addRoot(writeCount);
-		return null;
-	}
+        /** the final aggregator */
+        AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
+        AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
 
-	private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
-			throws HyracksDataException {
-		try {
+        /** writer */
+        AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
+        AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
+        jobSpec.addRoot(writeDegree);
+        jobSpec.addRoot(writeCount);
+        return null;
+    }
 
-			InputSplit[] splits = hadoopjob.getInputFormat().getSplits(
-					hadoopjob, ncNodeNames.length);
+    private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
 
-			String[] readSchedule = scheduler.getLocationConstraints(splits);
-			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec,
-					hadoopjob, splits, readSchedule,
-					new StatReadsKeyValueParserFactory());
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-	}
+            InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
 
-	private ExternalGroupOperatorDescriptor newExternalGroupby(
-			JobSpecification jobSpec, int[] keyFields,
-			IAggregatorDescriptorFactory aggeragater) {
-		return new ExternalGroupOperatorDescriptor(jobSpec, keyFields,
-				GenomixJob.DEFAULT_FRAME_LIMIT, new IBinaryComparatorFactory[] {
-						new ByteComparatorFactory() }, null, aggeragater,
-				new StatSumAggregateFactory(),
-				combineOutputRec, new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(keyFields,
-								new IBinaryHashFunctionFactory[] {
-										new ByteComparatorFactory() }),
-						GenomixJob.DEFAULT_TABLE_SIZE), true);
-	}
+            String[] readSchedule = scheduler.getLocationConstraints(splits);
+            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
+                    new StatReadsKeyValueParserFactory());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 
-	private AbstractOperatorDescriptor connectLocalAggregateByField(
-			JobSpecification jobSpec, int[] fields,
-			HDFSReadOperatorDescriptor readOperator) {
-		AbstractOperatorDescriptor localAggregator = newExternalGroupby(
-				jobSpec, fields, new StatCountAggregateFactory());
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				localAggregator, ncNodeNames);
-		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
-		return localAggregator;
-	}
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
+                new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
+                aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                                new ByteComparatorFactory(), new ByteComparatorFactory() }),
+                        GenomixJob.DEFAULT_TABLE_SIZE), true);
+    }
 
-	private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec,
-			int[] fields, AbstractOperatorDescriptor localAggregator) {
-		AbstractOperatorDescriptor finalAggregator = newExternalGroupby(
-				jobSpec, fields, new StatSumAggregateFactory());
-		// only need one reducer
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				finalAggregator, ncNodeNames[fields[0] % ncNodeNames.length]);
-		IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(
-				jobSpec,
-				new ITuplePartitionComputerFactory(){
-					private static final long serialVersionUID = 1L;
-					@Override
-					public ITuplePartitionComputer createPartitioner() {
-						return new ITuplePartitionComputer(){
-							@Override
-							public int partition(IFrameTupleAccessor accessor,
-									int tIndex, int nParts)
-									throws HyracksDataException {
-								return 0;
-							}
-						};
-					}
-				},
-				fields, 
-				new IBinaryComparatorFactory[]{new ByteComparatorFactory()});
-		jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
-		return finalAggregator;
-	}
-	
-	private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int [] fields, AbstractOperatorDescriptor finalAggregator){
-		LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(
-				jobSpec, null);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
-				writeOperator, ncNodeNames[fields[0] % ncNodeNames.length]);
+    private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
+            HDFSReadOperatorDescriptor readOperator) {
+        AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
+                new StatCountAggregateFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+        return localAggregator;
+    }
 
-		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
-				jobSpec);
-		jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
-		return writeOperator;
-	}
+    private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
+            AbstractOperatorDescriptor localAggregator) {
+        AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
+        // only need one reducer
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
+                % ncNodeNames.length]);
+        IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                new ITuplePartitionComputerFactory() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITuplePartitionComputer createPartitioner() {
+                        return new ITuplePartitionComputer() {
+                            @Override
+                            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                                    throws HyracksDataException {
+                                return 0;
+                            }
+                        };
+                    }
+                }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
+        jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+        return finalAggregator;
+    }
+
+    private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
+            AbstractOperatorDescriptor finalAggregator) {
+        LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
+                % ncNodeNames.length]);
+
+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+        return writeOperator;
+    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
index b469be0..832966b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -7,34 +22,33 @@
 
 public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	@Override
-	public IBinaryComparator createBinaryComparator() {
-		return new IBinaryComparator(){
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        return new IBinaryComparator() {
 
-			@Override
-			public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
-					int l2) {
-				return b1[s1]-b2[s2];
-			}
-			
-		};
-	}
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return b1[s1] - b2[s2];
+            }
 
-	@Override
-	public IBinaryHashFunction createBinaryHashFunction() {
-		return new IBinaryHashFunction(){
+        };
+    }
 
-			@Override
-			public int hash(byte[] bytes, int offset, int length) {
-				return bytes[offset];
-			}
-			
-		};
-	}
+    @Override
+    public IBinaryHashFunction createBinaryHashFunction() {
+        return new IBinaryHashFunction() {
+
+            @Override
+            public int hash(byte[] bytes, int offset, int length) {
+                return bytes[offset];
+            }
+
+        };
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
index 49a7453..65303a8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import java.io.DataOutput;
@@ -13,109 +28,102 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class StatCountAggregateFactory implements
-		IAggregatorDescriptorFactory {
+public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public class CountAggregator implements IAggregatorDescriptor {
+    public class CountAggregator implements IAggregatorDescriptor {
+        private final int[] keyFields;
 
-		@Override
-		public AggregateState createAggregateStates() {
-			// TODO Auto-generated method stub
-			return null;
-		}
+        public CountAggregator(int[] keyFields) {
+            this.keyFields = keyFields;
+        }
 
-		@Override
-		public void init(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = 1;
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when initializing the aggregator.");
-			}
-		}
+        @Override
+        public AggregateState createAggregateStates() {
+            // TODO Auto-generated method stub
+            return null;
+        }
 
-		@Override
-		public void reset() {
-			// TODO Auto-generated method stub
+        @Override
+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                throws HyracksDataException {
+            int count = 1;
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when initializing the aggregator.");
+            }
+        }
 
-		}
+        @Override
+        public void reset() {
+            // TODO Auto-generated method stub
 
-		@Override
-		public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-				IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-				AggregateState state) throws HyracksDataException {
-			int count = 1;
+        }
 
-			int statetupleOffset = stateAccessor
-					.getTupleStartOffset(stateTupleIndex);
-			int countfieldStart = stateAccessor.getFieldStartOffset(
-					stateTupleIndex, 1);
-			int countoffset = statetupleOffset
-					+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+        @Override
+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                int stateTupleIndex, AggregateState state) throws HyracksDataException {
+            int count = 1;
 
-			byte[] data = stateAccessor.getBuffer().array();
-			count += IntegerSerializerDeserializer.getInt(data, countoffset);
-			IntegerSerializerDeserializer.putInt(count, data, countoffset);
-		}
+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
 
-		@Override
-		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when writing aggregation to the output buffer.");
-			}
+            byte[] data = stateAccessor.getBuffer().array();
+            count += IntegerSerializerDeserializer.getInt(data, countoffset);
+            IntegerSerializerDeserializer.putInt(count, data, countoffset);
+        }
 
-		}
+        @Override
+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+            }
 
-		protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
-			int tupleOffset = accessor.getTupleStartOffset(tIndex);
-			int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
-			int countoffset = tupleOffset + accessor.getFieldSlotsLength()
-					+ fieldStart;
-			byte[] data = accessor.getBuffer().array();
+        }
 
-			return IntegerSerializerDeserializer.getInt(data, countoffset);
-		}
+        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+            int tupleOffset = accessor.getTupleStartOffset(tIndex);
+            int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
+            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+            byte[] data = accessor.getBuffer().array();
 
-		@Override
-		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			outputPartialResult(tupleBuilder, accessor, tIndex, state);
-		}
+            return IntegerSerializerDeserializer.getInt(data, countoffset);
+        }
 
-		@Override
-		public void close() {
-			// TODO Auto-generated method stub
+        @Override
+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            outputPartialResult(tupleBuilder, accessor, tIndex, state);
+        }
 
-		}
+        @Override
+        public void close() {
+            // TODO Auto-generated method stub
 
-	}
+        }
 
-	@Override
-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor, int[] keyFields,
-			int[] keyFieldsInPartialResults) throws HyracksDataException {
-		// TODO Auto-generated method stub
-		return new CountAggregator();
-	}
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new CountAggregator(keyFields);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
index 561d64a..2fcca67 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
@@ -1,13 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.io.BytesWritable;
-
 import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
 import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.genomix.type.KmerUtil;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -17,70 +30,65 @@
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
 
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<BytesWritable,KmerCountValue> {
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	@Override
-	public IKeyValueParser<BytesWritable,KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
-			throws HyracksDataException {
-		
-		final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
-		final ByteBuffer outputBuffer = ctx.allocateFrame();
-		final FrameTupleAppender outputAppender = new FrameTupleAppender(
-				ctx.getFrameSize());
-		outputAppender.reset(outputBuffer, true);
-		
-		return new IKeyValueParser<BytesWritable,KmerCountValue>(){
+    @Override
+    public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+            throws HyracksDataException {
 
-			@Override
-			public void open(IFrameWriter writer) throws HyracksDataException {
-				// TODO Auto-generated method stub
-				
-			}
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputBuffer, true);
 
-			@Override
-			public void parse(BytesWritable key, KmerCountValue value,
-					IFrameWriter writer) throws HyracksDataException {
-				byte adjMap = value.getAdjBitMap();
-				byte count = value.getCount();
-				InsertToFrame((byte) (GeneCode.inDegree(adjMap)*10+GeneCode.outDegree(adjMap)),count,writer);
-			}
+        return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
 
-			@Override
-			public void close(IFrameWriter writer) throws HyracksDataException {
-				FrameUtils.flushFrame(outputBuffer, writer);
-			}
-			
-			private void InsertToFrame(byte degree, byte count,
-					IFrameWriter writer) {
-				try {
-					tupleBuilder.reset();
-					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,degree);
-					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,count);
+            @Override
+            public void open(IFrameWriter writer) throws HyracksDataException {
+                // TODO Auto-generated method stub
 
-					if (!outputAppender.append(
-							tupleBuilder.getFieldEndOffsets(),
-							tupleBuilder.getByteArray(), 0,
-							tupleBuilder.getSize())) {
-						FrameUtils.flushFrame(outputBuffer, writer);
-						outputAppender.reset(outputBuffer, true);
-						if (!outputAppender.append(
-								tupleBuilder.getFieldEndOffsets(),
-								tupleBuilder.getByteArray(), 0,
-								tupleBuilder.getSize())) {
-							throw new IllegalStateException(
-									"Failed to copy an record into a frame: the record size is too large.");
-						}
-					}
-				} catch (Exception e) {
-					throw new IllegalStateException(e);
-				}
-			}
-		};
-	}
+            }
+
+            @Override
+            public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
+                    throws HyracksDataException {
+                byte adjMap = value.getAdjBitMap();
+                byte count = value.getCount();
+                InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+
+            private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
+                try {
+                    tupleBuilder.reset();
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
+
+                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputBuffer, writer);
+                        outputAppender.reset(outputBuffer, true);
+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                tupleBuilder.getSize())) {
+                            throw new IllegalStateException(
+                                    "Failed to copy an record into a frame: the record size is too large.");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        };
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
index ce00667..39ac60a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.util;
 
 import java.io.DataOutput;
@@ -15,108 +30,102 @@
 
 public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
 
-	/**
+    /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	public class DistributeAggregatorDescriptor implements
-			IAggregatorDescriptor {
+    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
 
-		@Override
-		public AggregateState createAggregateStates() {
-			// TODO Auto-generated method stub
-			return null;
-		}
+        private final int[] keyFields;
 
-		protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
-			int tupleOffset = accessor.getTupleStartOffset(tIndex);
-			int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
-			int countoffset = tupleOffset + accessor.getFieldSlotsLength()
-					+ fieldStart;
-			byte[] data = accessor.getBuffer().array();
-			return IntegerSerializerDeserializer.getInt(data, countoffset);
-		}
+        public DistributeAggregatorDescriptor(int[] keyFields) {
+            this.keyFields = keyFields;
+        }
 
-		@Override
-		public void init(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
+        @Override
+        public AggregateState createAggregateStates() {
+            // TODO Auto-generated method stub
+            return null;
+        }
 
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when initializing the aggregator.");
-			}
-		}
+        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+            int tupleOffset = accessor.getTupleStartOffset(tIndex);
+            int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+            byte[] data = accessor.getBuffer().array();
+            return IntegerSerializerDeserializer.getInt(data, countoffset);
+        }
 
-		@Override
-		public void reset() {
-			// TODO Auto-generated method stub
+        @Override
+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
 
-		}
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when initializing the aggregator.");
+            }
+        }
 
-		@Override
-		public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-				IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-				AggregateState state) throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
+        @Override
+        public void reset() {
+            // TODO Auto-generated method stub
 
-			int statetupleOffset = stateAccessor
-					.getTupleStartOffset(stateTupleIndex);
-			int countfieldStart = stateAccessor.getFieldStartOffset(
-					stateTupleIndex, 1);
-			int countoffset = statetupleOffset
-					+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+        }
 
-			byte[] data = stateAccessor.getBuffer().array();
-			count += IntegerSerializerDeserializer.getInt(data, countoffset);
-			IntegerSerializerDeserializer.putInt(count, data, countoffset);
-		}
+        @Override
+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                int stateTupleIndex, AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
 
-		@Override
-		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			int count = getCount(accessor, tIndex);
-			DataOutput fieldOutput = tupleBuilder.getDataOutput();
-			try {
-				fieldOutput.writeInt(count);
-				tupleBuilder.addFieldEndOffset();
-			} catch (IOException e) {
-				throw new HyracksDataException(
-						"I/O exception when writing aggregation to the output buffer.");
-			}
+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
 
-		}
+            byte[] data = stateAccessor.getBuffer().array();
+            count += IntegerSerializerDeserializer.getInt(data, countoffset);
+            IntegerSerializerDeserializer.putInt(count, data, countoffset);
+        }
 
-		@Override
-		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-				throws HyracksDataException {
-			outputPartialResult(tupleBuilder, accessor, tIndex, state);
+        @Override
+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+            }
 
-		}
+        }
 
-		@Override
-		public void close() {
-			// TODO Auto-generated method stub
+        @Override
+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            outputPartialResult(tupleBuilder, accessor, tIndex, state);
 
-		}
+        }
 
-	}
+        @Override
+        public void close() {
+            // TODO Auto-generated method stub
 
-	@Override
-	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor, int[] keyFields,
-			int[] keyFieldsInPartialResults) throws HyracksDataException {
-		// TODO Auto-generated method stub
-		return new DistributeAggregatorDescriptor();
-	}
+        }
+
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new DistributeAggregatorDescriptor(keyFields);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index bef13b5..847272a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.example.jobrun;
 
 import java.io.BufferedWriter;
@@ -194,9 +209,10 @@
                     continue;
                 }
                 SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-                
-//                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER));
+
+                //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
+                        GenomixJob.DEFAULT_KMER));
                 KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
 
                 while (reader.next(key, value)) {
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
index 22688e0..aa1f791 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
@@ -24,39 +24,40 @@
 public class TestUtils {
     /**
      * Compare with the sorted expected file.
-     * The actual file may not be sorted; 
+     * The actual file may not be sorted;
+     * 
      * @param expectedFile
      * @param actualFile
      */
-    public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+    public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception {
         BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
         BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
         ArrayList<String> actualLines = new ArrayList<String>();
         String lineExpected, lineActual;
-        try{
-                while ( (lineActual = readerActual.readLine())!=null){
-                        actualLines.add(lineActual);
-                }
-                Collections.sort(actualLines);
-                int num = 1;
-                for(String actualLine : actualLines){
-                        lineExpected = readerExpected.readLine();
-                        if (lineExpected == null){
-                                throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
-                        }
-                        if ( !equalStrings(lineExpected, actualLine)){
-                                   throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
-                               + actualLine);
-                        }
-                ++num;
-                }
+        try {
+            while ((lineActual = readerActual.readLine()) != null) {
+                actualLines.add(lineActual);
+            }
+            Collections.sort(actualLines);
+            int num = 1;
+            for (String actualLine : actualLines) {
                 lineExpected = readerExpected.readLine();
-                if (lineExpected != null) {
-                    throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+                if (lineExpected == null) {
+                    throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
                 }
-        } finally{
-                readerActual.close();
-                readerExpected.close();
+                if (!equalStrings(lineExpected, actualLine)) {
+                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                            + actualLine);
+                }
+                ++num;
+            }
+            lineExpected = readerExpected.readLine();
+            if (lineExpected != null) {
+                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+            }
+        } finally {
+            readerActual.close();
+            readerExpected.close();
         }
     }
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
deleted file mode 100644
index 88c5c35..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ /dev/null
@@ -1,309 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ValueStateWritable
- * edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
- * 
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- * 
- * succeed node
- *  A 00000001 1
- *  G 00000010 2
- *  C 00000100 4
- *  T 00001000 8
- * precursor node
- *  A 00010000 16
- *  G 00100000 32
- *  C 01000000 64
- *  T 10000000 128
- *  
- * For example, ONE LINE in input file: 00,01,10	0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
- */
-public class LogAlgorithmForMergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{	
-	public static final String KMER_SIZE = "LogAlgorithmForMergeGraphVertex.kmerSize";
-	public static final String ITERATIONS = "MergeGraphVertex.iteration";
-	public static int kmerSize = -1;
-	private int maxIteration = -1;
-	
-	private byte[] tmpVertexId;
-	private byte[] tmpDestVertexId;
-	private BytesWritable destVertexId = new BytesWritable();
-	private byte[] mergeChainVertexId;
-	private int lengthOfMergeChainVertex;
-	private byte tmpVertexValue;
-	private ValueStateWritable tmpVal = new ValueStateWritable();
-	private LogAlgorithmMessageWritable tmpMsg = new LogAlgorithmMessageWritable();
-	/**
-	 * Log Algorithm for path merge graph
-	 */
-	/**
-     *	Load KmerSize, MaxIteration
-     */
-	@Override
-	public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
-		if(kmerSize == -1)
-			kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
-        if (maxIteration < 0)
-            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
-		tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
-		tmpVal = getVertexValue();
-		if (getSuperstep() == 1) {
-			tmpMsg.setChainVertexId(new byte[0]);
-			if(GraphVertexOperation.isHeadVertex(tmpVal.getValue())){
-				tmpMsg.setMessage(Message.START);
-				for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
-					if((tmpVal.getValue() & (1 << x)) != 0){
-						tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
-						destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-						sendMsg(destVertexId,tmpMsg);
-					}
-				}
-				voteToHalt();
-			}
-			if(GraphVertexOperation.isRearVertex(tmpVal.getValue())){
-				tmpMsg.setMessage(Message.END);
-				
-				for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
-					if(((tmpVal.getValue()>> 4) & (1 << x)) != 0){
-						tmpDestVertexId = KmerUtil.shiftKmerWithPreCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
-						destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-						sendMsg(destVertexId,tmpMsg);
-					}
-				}
-				voteToHalt();
-			}
-			if(GraphVertexOperation.isPathVertex(tmpVal.getValue())){
-				tmpVal.setState(State.MID_VERTEX);
-				setVertexValue(tmpVal);
-			}
-			/*if(!GraphVertexOperation.isHeadVertex(tmpVal.getValue())
-					&& !GraphVertexOperation.isRearVertex(tmpVal.getValue())
-					&& !GraphVertexOperation.isRearVertex(tmpVal.getValue()))
-				voteToHalt();*/
-		}
-		else if(getSuperstep() == 2 && getSuperstep() <= maxIteration){
-			while(msgIterator.hasNext()){
-				if(!GraphVertexOperation.isPathVertex(tmpVal.getValue())){
-					msgIterator.next();
-					voteToHalt();
-				}
-				else{
-					tmpMsg = msgIterator.next();
-					if(tmpMsg.getMessage() == Message.START && 
-							(tmpVal.getState() == State.MID_VERTEX || tmpVal.getState() == State.END_VERTEX)){
-						tmpVal.setState(State.START_VERTEX);
-						setVertexValue(tmpVal);
-					}
-					else if(tmpMsg.getMessage() == Message.END && tmpVal.getState() == State.MID_VERTEX){
-						tmpVal.setState(State.END_VERTEX);
-						setVertexValue(tmpVal);
-						voteToHalt();
-					}
-					else
-						voteToHalt();
-				}
-			}
-		}
-		//head node sends message to path node
-		else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
-			if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
-				voteToHalt();
-			else{
-				if(getSuperstep() == 3){
-					tmpMsg = new LogAlgorithmMessageWritable();
-					if(Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)) == -1)
-						voteToHalt();
-					else{
-						tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId, 
-								0, tmpVertexId.length, 
-								Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)));
-						destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-						if(tmpVal.getState() == State.START_VERTEX){
-							tmpMsg.setMessage(Message.START);
-							tmpMsg.setSourceVertexId(getVertexId().getBytes());
-							sendMsg(destVertexId, tmpMsg);
-							voteToHalt();
-						}
-						else if(tmpVal.getState() != State.END_VERTEX && tmpVal.getState() != State.FINAL_DELETE){
-							tmpMsg.setMessage(Message.NON);
-							tmpMsg.setSourceVertexId(getVertexId().getBytes());
-							sendMsg(destVertexId,tmpMsg);
-							voteToHalt();
-						}
-					}
-				}
-				else{
-					if(msgIterator.hasNext()){
-						tmpMsg = msgIterator.next();
-						byte[] lastKmer = KmerUtil.getLastKmerFromChain(kmerSize,
-								tmpVal.getLengthOfMergeChain(),
-								tmpVal.getMergeChain(),
-								0, tmpVal.getMergeChain().length);
-						if(Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)) == -1 || lastKmer == null)
-							voteToHalt();
-						else{
-							tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, lastKmer, 
-									0, lastKmer.length,
-									Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F))); 
-							destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-							if(tmpVal.getState() == State.START_VERTEX){
-								tmpMsg.setMessage(Message.START);
-								tmpMsg.setSourceVertexId(getVertexId().getBytes());
-								sendMsg(destVertexId, tmpMsg);
-								voteToHalt();
-							}
-							else if(tmpVal.getState() != State.END_VERTEX && tmpVal.getState() != State.FINAL_DELETE){
-								tmpMsg.setMessage(Message.NON);
-								tmpMsg.setSourceVertexId(getVertexId().getBytes());
-								sendMsg(destVertexId,tmpMsg);
-							}
-						}
-					}
-				}
-			}
-		}
-		
-		//path node sends message back to head node
-		else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
-			if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
-				voteToHalt();
-			else{
-				if(msgIterator.hasNext()){
-					tmpMsg = msgIterator.next();
-					int message = tmpMsg.getMessage();
-					if(tmpVal.getLengthOfMergeChain() == 0){
-						tmpVal.setLengthOfMergeChain(kmerSize);
-						tmpVal.setMergeChain(tmpVertexId);
-						setVertexValue(tmpVal);
-					}
-					tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
-					tmpMsg.setChainVertexId(tmpVal.getMergeChain());
-					
-					tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
-					tmpMsg.setSourceVertexState(tmpVal.getState());
-					
-					//kill Message because it has been merged by the head
-					if(tmpVal.getState() == State.END_VERTEX || tmpVal.getState() == State.FINAL_DELETE){
-						tmpMsg.setMessage(Message.END);
-						tmpVal.setState(State.FINAL_DELETE);
-						setVertexValue(tmpVal);
-						//deleteVertex(getVertexId());
-					}
-					else
-						tmpMsg.setMessage(Message.NON);
-					
-					if(message == Message.START){
-						tmpVal.setState(State.TODELETE);
-						setVertexValue(tmpVal);
-					}
-					destVertexId.set(tmpMsg.getSourceVertexId(), 0, tmpMsg.getSourceVertexId().length);
-					sendMsg(destVertexId,tmpMsg);
-					//voteToHalt();
-				}
-				else{
-					if(getVertexValue().getState() != State.START_VERTEX //&& getVertexValue().getState() != State.NON_EXIST
-							&& getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
-						tmpVal.setState(State.KILL_SELF);
-						setVertexValue(tmpVal);
-						voteToHalt();
-						//deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
-					}
-				}
-			}
-		}
-		else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
-			if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
-				voteToHalt(); //deleteVertex(getVertexId()); //killSelf
-			else{
-				if(msgIterator.hasNext()){
-					tmpMsg = msgIterator.next();
-
-					if(tmpMsg.getMessage() == Message.END){
-						if(tmpVal.getState() != State.START_VERTEX)
-							tmpVal.setState(State.END_VERTEX);
-						else
-							tmpVal.setState(State.FINAL_VERTEX);
-					}
-						
-					if(getSuperstep() == 5){
-						lengthOfMergeChainVertex = kmerSize;
-						mergeChainVertexId = tmpVertexId;
-					}
-					else{
-						lengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain(); 
-						mergeChainVertexId = tmpVal.getMergeChain(); 
-					}
-					byte[] tmplastKmer = KmerUtil.getLastKmerFromChain(tmpMsg.getLengthOfChain() - kmerSize + 1,
-							tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId(),0, tmpMsg.getChainVertexId().length);
-					mergeChainVertexId = KmerUtil.mergeTwoKmer(lengthOfMergeChainVertex, 
-							mergeChainVertexId, 
-							0, mergeChainVertexId.length,
-							tmpMsg.getLengthOfChain() - kmerSize + 1, 
-							tmplastKmer, 0, tmplastKmer.length);
-					lengthOfMergeChainVertex = lengthOfMergeChainVertex + tmpMsg.getLengthOfChain()
-							- kmerSize + 1;
-					tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
-					tmpVal.setMergeChain(mergeChainVertexId);
-
-					tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpMsg.getNeighberInfo());
-					tmpVal.setValue(tmpVertexValue);
-					if(tmpMsg.getMessage() != Message.END){
-						setVertexValue(tmpVal);
-						tmpMsg = new LogAlgorithmMessageWritable(); //reset
-						tmpMsg.setNeighberInfo(tmpVertexValue);
-						sendMsg(getVertexId(),tmpMsg);
-					}
-				}
-				if(tmpVal.getState() == State.END_VERTEX || tmpVal.getState() == State.FINAL_DELETE)
-					voteToHalt();
-				if(tmpVal.getState() == State.FINAL_VERTEX){
-					//String source = Kmer.recoverKmerFrom(tmpVal.getLengthOfMergeChain(), tmpVal.getMergeChain(), 0, tmpVal.getMergeChain().length);
-					voteToHalt();
-				}
-			}
-		}
-	}
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) throws Exception {
-        PregelixJob job = new PregelixJob(LogAlgorithmForMergeGraphVertex.class.getSimpleName());
-        job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
-        /**
-         * BinaryInput and BinaryOutput~/
-         */
-        job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class); 
-        job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class); 
-        job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(ValueStateWritable.class);
-        job.setDynamicVertexValueSize(true);
-        Client.run(args, job);
-	}
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
deleted file mode 100644
index ec45019..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- * 
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- * 
- * succeed node
- *  A 00000001 1
- *  G 00000010 2
- *  C 00000100 4
- *  T 00001000 8
- * precursor node
- *  A 00010000 16
- *  G 00100000 32
- *  C 01000000 64
- *  T 10000000 128
- *  
- * For example, ONE LINE in input file: 00,01,10	0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
- */
-public class MergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
-	
-	public static final String KMER_SIZE = "MergeGraphVertex.kmerSize";
-	public static final String ITERATIONS = "MergeGraphVertex.iteration";
-	public static int kmerSize = -1;
-	private int maxIteration = -1;
-	
-    private byte[] tmpVertexId;
-    private byte[] tmpDestVertexId;
-	private BytesWritable destVertexId = new BytesWritable();
-	private BytesWritable tmpChainVertexId = new BytesWritable();
-	private ValueStateWritable tmpVertexValue = new ValueStateWritable();
-	private MessageWritable tmpMsg = new MessageWritable();
-	/**
-	 * Naive Algorithm for path merge graph
-	 * @throws Exception 
-	 * @throws  
-	 */
-	/**
-     *	Load KmerSize, MaxIteration
-     */
-	@Override
-	public void compute(Iterator<MessageWritable> msgIterator) {
-		if(kmerSize == -1)
-			kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
-        if (maxIteration < 0) 
-            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
-		tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
-		if (getSuperstep() == 1) {
-			if(GraphVertexOperation.isHeadVertex(getVertexValue().getValue())){ 
-				tmpMsg.setSourceVertexId(tmpVertexId);
-				tmpMsg.setHead(tmpVertexId);
-				tmpMsg.setLengthOfChain(0);
-				tmpMsg.setChainVertexId(tmpChainVertexId.getBytes());
-				for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
-					if((getVertexValue().getValue() & (1 << x)) != 0){
-						tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
-						destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-						sendMsg(destVertexId,tmpMsg);
-					}
-				}
-			}
-		}
-		
-		//path node sends message back to head node
-		else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
-			 if(msgIterator.hasNext()){
-				tmpMsg = msgIterator.next();
-					
-				if(!tmpMsg.isRear()){
-					if(getSuperstep() == 2)
-						tmpMsg.setHead(tmpVertexId);
-					if(GraphVertexOperation.isPathVertex(getVertexValue().getValue())){
-						tmpDestVertexId = tmpMsg.getSourceVertexId();
-						tmpMsg.setNeighberInfo(getVertexValue().getValue()); //set neighber
-						if(tmpMsg.getLengthOfChain() == 0){
-							tmpMsg.setLengthOfChain(kmerSize);
-							tmpMsg.setChainVertexId(tmpVertexId);
-						}
-						else{
-							String source = Kmer.recoverKmerFrom(kmerSize, tmpVertexId, 0, tmpVertexId.length);
-							tmpMsg.setChainVertexId(KmerUtil.mergeKmerWithNextCode(
-									tmpMsg.getLengthOfChain(),
-									tmpMsg.getChainVertexId(), 
-									0, tmpMsg.getChainVertexId().length,
-									Kmer.GENE_CODE.getCodeFromSymbol((byte)source.charAt(source.length() - 1))));
-							tmpMsg.incrementLength();
-							deleteVertex(getVertexId());
-						}
-						destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-						sendMsg(destVertexId,tmpMsg);
-					}
-					else if(GraphVertexOperation.isRearVertex(getVertexValue().getValue())){
-						if(getSuperstep() == 2)
-							voteToHalt();
-						else{
-							tmpDestVertexId = tmpMsg.getSourceVertexId();
-							tmpMsg.setSourceVertexId(tmpVertexId);
-							tmpMsg.setRear(true);
-							destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-							sendMsg(destVertexId,tmpMsg);
-						}
-					}
-				}
-				else{
-					tmpVertexValue.setState(State.START_VERTEX);
-					tmpVertexValue.setValue(GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getValue(),
-							tmpMsg.getSourceVertexId(), kmerSize));
-					tmpVertexValue.setLengthOfMergeChain(tmpMsg.getLengthOfChain());
-					tmpVertexValue.setMergeChain(tmpMsg.getChainVertexId());
-					setVertexValue(tmpVertexValue);
-					//String source = Kmer.recoverKmerFrom(tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId(), 0, tmpMsg.getChainVertexId().length);
-					//System.out.print("");
-					/*try {
-						
-						GraphVertexOperation.flushChainToFile(tmpMsg.getChainVertexId(), 
-								tmpMsg.getLengthOfChain(),tmpVertexId);
-					} catch (IOException e) { e.printStackTrace(); }*/
-				}
-			}
-		}
-		//head node sends message to path node
-		else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
-			while (msgIterator.hasNext()){
-				tmpMsg = msgIterator.next();
-				if(!tmpMsg.isRear()){
-					byte[] lastKmer = KmerUtil.getLastKmerFromChain(kmerSize,
-							tmpMsg.getLengthOfChain(),
-							tmpMsg.getChainVertexId(),
-							0, tmpMsg.getChainVertexId().length);
-					tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, lastKmer, 
-							0, lastKmer.length,
-							Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpMsg.getNeighberInfo() & 0x0F)));
-
-					tmpMsg.setSourceVertexId(tmpVertexId);
-					destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-					sendMsg(destVertexId,tmpMsg);
-				}
-				else{	
-					tmpDestVertexId = tmpMsg.getHead();
-					destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
-					sendMsg(destVertexId,tmpMsg);
-				}
-			}
-		}
-		voteToHalt();
-	}
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) throws Exception {
-        PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
-        job.setVertexClass(MergeGraphVertex.class);
-        /**
-         * BinaryInput and BinaryOutput
-         */
-        job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class); 
-        job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class); 
-        job.setDynamicVertexValueSize(true);
-        job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(ValueStateWritable.class);
-        Client.run(args, job);
-	}
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
index 823a984..e1868b1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -3,7 +3,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -14,12 +13,14 @@
 
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerCountValue;
 
 public class BinaryVertexInputFormat <I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
 	extends VertexInputFormat<I, V, E, M>{
 	
     /** Uses the SequenceFileInputFormat to do everything */
+	@SuppressWarnings("rawtypes")
 	protected SequenceFileInputFormat binaryInputFormat = new SequenceFileInputFormat();
     
     /**
@@ -37,7 +38,7 @@
     public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
             implements VertexReader<I, V, E, M> {
         /** Internal line record reader */
-        private final RecordReader<BytesWritable,KmerCountValue> lineRecordReader;
+        private final RecordReader<KmerBytesWritable,KmerCountValue> lineRecordReader;
         /** Context passed to initialize */
         private TaskAttemptContext context;
 
@@ -47,7 +48,7 @@
          * @param recordReader
          *            Line record reader from SequenceFileInputFormat
          */
-        public BinaryVertexReader(RecordReader<BytesWritable, KmerCountValue> recordReader) {
+        public BinaryVertexReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
             this.lineRecordReader = recordReader;
         }
 
@@ -73,7 +74,7 @@
          * 
          * @return Record reader to be used for reading.
          */
-        protected RecordReader<BytesWritable,KmerCountValue> getRecordReader() {
+        protected RecordReader<KmerBytesWritable,KmerCountValue> getRecordReader() {
             return lineRecordReader;
         }
 
@@ -87,7 +88,8 @@
         }
     }
 
-    @Override
+    @SuppressWarnings("unchecked")
+	@Override
     public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
         // Ignore the hint of numWorkers here since we are using SequenceFileInputFormat
         // to do this for us
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
index f497f21..1435770 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -2,7 +2,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -12,6 +11,7 @@
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
 
@@ -49,7 +49,7 @@
         /** Context passed to initialize */
         private TaskAttemptContext context;
         /** Internal line record writer */
-        private final RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter;
+        private final RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter;
 
         /**
          * Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
          * @param lineRecordWriter
          *            Line record writer from SequenceFileOutputFormat
          */
-        public BinaryVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
+        public BinaryVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
             this.lineRecordWriter = lineRecordWriter;
         }
 
@@ -76,7 +76,7 @@
          * 
          * @return Record writer to be used for writing.
          */
-        public RecordWriter<BytesWritable, ValueStateWritable> getRecordWriter() {
+        public RecordWriter<KmerBytesWritable, ValueStateWritable> getRecordWriter() {
             return lineRecordWriter;
         }
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 2d70fdf..60342a7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,8 +10,8 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.driver.Driver;
@@ -60,11 +60,11 @@
         for (int i = 1; i < inputs.length; i++)
             FileInputFormat.addInputPaths(job, inputs[0]);
         FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
-        job.getConfiguration().setInt(MergeGraphVertex.KMER_SIZE, options.sizeKmer);
-        job.getConfiguration().setInt(LogAlgorithmForMergeGraphVertex.KMER_SIZE, options.sizeKmer);
+        job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+        job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
         if (options.numIteration > 0){
-            job.getConfiguration().setLong(MergeGraphVertex.ITERATIONS, options.numIteration);
-            //job.getConfiguration().setLong(LogAlgorithmForMergeGraphVertex.ITERATIONS, options.numIteration);
+            job.getConfiguration().setLong(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+            job.getConfiguration().setLong(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
         }
         return options;
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
deleted file mode 100644
index 2b87379..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-
-public class BinaryLoadGraphOutputFormat extends 
-	BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
-
-        @Override
-        public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
-            return new BinaryLoadGraphVertexWriter(recordWriter);
-        }
-        
-        /**
-         * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
-         */
-        public static class BinaryLoadGraphVertexWriter extends
-                BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
-            public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
-                super(lineRecordWriter);
-            }
-
-            @Override
-            public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
-                    InterruptedException {
-                getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
-            }
-        }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
similarity index 66%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 0e74c2d..7898d35 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -2,7 +2,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -12,31 +11,32 @@
 import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
 import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerCountValue;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 
-public class LogAlgorithmForMergeGraphInputFormat extends
-	BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
-
+public class LogAlgorithmForPathMergeInputFormat extends
+	BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
 	/**
 	 * Format INPUT
 	 */
-    @Override
-    public VertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
+    @SuppressWarnings("unchecked")
+	@Override
+    public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
             InputSplit split, TaskAttemptContext context) throws IOException {
         return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
     }
     
     @SuppressWarnings("rawtypes")
     class BinaryLoadGraphReader extends
-            BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+            BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
         private Vertex vertex;
-        private BytesWritable vertexId = new BytesWritable();
+        private KmerBytesWritable vertexId = null;
         private ValueStateWritable vertexValue = new ValueStateWritable();
 
-        public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
+        public BinaryLoadGraphReader(RecordReader<KmerBytesWritable,KmerCountValue> recordReader) {
             super(recordReader);
         }
 
@@ -47,7 +47,7 @@
 
         @SuppressWarnings("unchecked")
         @Override
-        public Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
+        public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
                 InterruptedException {
             if (vertex == null)
                 vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -55,19 +55,19 @@
             vertex.getMsgList().clear();
             vertex.getEdges().clear();
             
-            
             if(getRecordReader() != null){
 	            /**
 	             * set the src vertex id
 	             */
-	            
-        		vertexId = getRecordReader().getCurrentKey();
+            	if(vertexId == null)
+            		vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
+        		vertexId.set(getRecordReader().getCurrentKey());
         		vertex.setVertexId(vertexId);
 	            /**
 	             * set the vertex value
 	             */
 	            KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
-	            vertexValue.setValue(kmerCountValue.getAdjBitMap()); 
+	            vertexValue.setAdjMap(kmerCountValue.getAdjBitMap()); 
 	            vertexValue.setState(State.NON_VERTEX);
 	            vertex.setVertexValue(vertexValue);
             }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
similarity index 60%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 865a787..29c5ccb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -2,27 +2,26 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
 import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
 import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 
-public class LogAlgorithmForMergeGraphOutputFormat extends 
-	BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
+public class LogAlgorithmForPathMergeOutputFormat extends 
+	BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
 
 		
         @Override
-        public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+        public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
                 throws IOException, InterruptedException {
-            RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+            @SuppressWarnings("unchecked")
+			RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
             return new BinaryLoadGraphVertexWriter(recordWriter);
         }
         
@@ -30,19 +29,20 @@
          * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
          */
         public static class BinaryLoadGraphVertexWriter extends
-                BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
+                BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
         	
-            public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
+            public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
                 super(lineRecordWriter);
             }
 
             @Override
-            public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+            public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
                     InterruptedException {
             	if(vertex.getVertexValue().getState() != State.FINAL_DELETE
             			&& vertex.getVertexValue().getState() != State.END_VERTEX
             			&& vertex.getVertexValue().getState() != State.TODELETE
-            			&& vertex.getVertexValue().getState() != State.KILL_SELF)
+            			&& vertex.getVertexValue().getState() != State.KILL_SELF
+            			&& vertex.getVertexValue().getState() != State.NON_EXIST)
                     getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
             }
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
similarity index 63%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index 4cd22ac..ca134c0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -1,35 +1,30 @@
 package edu.uci.ics.genomix.pregelix.format;
 
 import java.io.IOException;
-import java.util.logging.FileHandler;
-import java.util.logging.Logger;
 
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerCountValue;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueWritable;
-import edu.uci.ics.genomix.pregelix.log.DataLoadLogFormatter;
 import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
 import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
 
-public class BinaryLoadGraphInputFormat extends
-	BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
+public class NaiveAlgorithmForPathMergeInputFormat extends
+	BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
 	/**
 	 * Format INPUT
 	 */
-    @Override
-    public VertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+    @SuppressWarnings("unchecked")
+	@Override
+    public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> createVertexReader(
             InputSplit split, TaskAttemptContext context) throws IOException {
         return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
     }	
@@ -37,12 +32,12 @@
 
 @SuppressWarnings("rawtypes")
 class BinaryLoadGraphReader extends
-        BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> {
-    private Vertex vertex;
-    private BytesWritable vertexId = new BytesWritable();
+        BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+	private Vertex vertex;
+    private KmerBytesWritable vertexId = null;
     private ValueStateWritable vertexValue = new ValueStateWritable();
 
-    public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
+    public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
         super(recordReader);
     }
 
@@ -53,7 +48,7 @@
 
     @SuppressWarnings("unchecked")
     @Override
-    public Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
+    public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> getCurrentVertex() throws IOException,
             InterruptedException {
         if (vertex == null)
             vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -66,13 +61,15 @@
             /**
              * set the src vertex id
              */
+        	if(vertexId == null)
+        		vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
     		vertexId.set(getRecordReader().getCurrentKey());
     		vertex.setVertexId(vertexId);
             /**
              * set the vertex value
              */
             KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
-            vertexValue.setValue(kmerCountValue.getAdjBitMap()); 
+            vertexValue.setAdjMap(kmerCountValue.getAdjBitMap()); 
             vertex.setVertexValue(vertexValue);
         }
         
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
new file mode 100644
index 0000000..482014e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class NaiveAlgorithmForPathMergeOutputFormat extends 
+	BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+
+        @Override
+        public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            @SuppressWarnings("unchecked")
+			RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+            return new BinaryLoadGraphVertexWriter(recordWriter);
+        }
+        
+        /**
+         * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+         */
+        public static class BinaryLoadGraphVertexWriter extends
+                BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
+            public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+                super(lineRecordWriter);
+            }
+
+            @Override
+            public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+                    InterruptedException {
+            	if(vertex.getVertexValue().getState() != State.NON_EXIST)
+            		getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+            }
+        }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index 3d07c5b..ac84d8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -2,11 +2,12 @@
 
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
+
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
 public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable>{
 	/**
@@ -15,65 +16,55 @@
 	 * chainVertexId stores the chains of connected DNA
 	 * file stores the point to the file that stores the chains of connected DNA
 	 */
-	private byte[] sourceVertexId;
-	private byte neighberInfo;
-	private int lengthOfChain;
-	private byte[] chainVertexId;
-	private File file;
+	private VKmerBytesWritable sourceVertexId;
+	private VKmerBytesWritable chainVertexId;
+	private byte adjMap;
 	private int message;
 	private int sourceVertexState;
 	
 	public LogAlgorithmMessageWritable(){
-		sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
+		sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
+		chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
 	}
 	
-	public void set(byte[] sourceVertexId,byte neighberInfo, byte[] chainVertexId, File file){
-		this.sourceVertexId = sourceVertexId;
-		this.chainVertexId = chainVertexId;
-		this.file = file;
-		this.message = 0;
-		this.lengthOfChain = 0;
+	public void set(VKmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, int message, int sourceVertexState){
+		this.sourceVertexId.set(sourceVertexId);
+		this.chainVertexId.set(chainVertexId);
+		this.adjMap = adjMap;
+		this.message = message;
+		this.sourceVertexState = sourceVertexState;
 	}
 	
 	public void reset(){
-		sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
-		neighberInfo = (Byte) null;
-		lengthOfChain = 0;
-		chainVertexId = null;
+		//sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+		chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+		adjMap = (byte)0;
 		message = 0;
 		sourceVertexState = 0;
 	}
 
-	public byte[] getSourceVertexId() {
+	public VKmerBytesWritable getSourceVertexId() {
 		return sourceVertexId;
 	}
 
-	public void setSourceVertexId(byte[] sourceVertexId) {
-		this.sourceVertexId = sourceVertexId;
+	public void setSourceVertexId(VKmerBytesWritable sourceVertexId) {
+		this.sourceVertexId.set(sourceVertexId);
 	}
 
-	public byte getNeighberInfo() {
-		return neighberInfo;
+	public byte getAdjMap() {
+		return adjMap;
 	}
 
-	public void setNeighberInfo(byte neighberInfo) {
-		this.neighberInfo = neighberInfo;
+	public void setAdjMap(byte adjMap) {
+		this.adjMap = adjMap;
 	}
 
-	public byte[] getChainVertexId() {
+	public VKmerBytesWritable getChainVertexId() {
 		return chainVertexId;
 	}
 
-	public void setChainVertexId(byte[] chainVertexId) {
-		this.chainVertexId = chainVertexId;
-	}
-
-	public File getFile() {
-		return file;
-	}
-
-	public void setFile(File file) {
-		this.file = file;
+	public void setChainVertexId(VKmerBytesWritable chainVertexId) {
+		this.chainVertexId.set(chainVertexId);
 	}
 
 	public int getMessage() {
@@ -93,84 +84,48 @@
 	}
 
 	public int getLengthOfChain() {
-		return lengthOfChain;
-	}
-
-	public void setLengthOfChain(int lengthOfChain) {
-		this.lengthOfChain = lengthOfChain;
-	}
-
-	public void incrementLength(){
-		this.lengthOfChain++;
+		return chainVertexId.getKmerLength();
 	}
 	
 	@Override
 	public void write(DataOutput out) throws IOException {
-		// TODO Auto-generated method stub
-		out.writeInt(lengthOfChain);
-		if(lengthOfChain != 0)
-			out.write(chainVertexId);
-
+		sourceVertexId.write(out);
+		chainVertexId.write(out);
+		out.write(adjMap);
 		out.writeInt(message);
 		out.writeInt(sourceVertexState);
-		
-		out.write(sourceVertexId); 
-		out.write(neighberInfo);
 	}
 
 	@Override
 	public void readFields(DataInput in) throws IOException {
-		// TODO Auto-generated method stub
-		lengthOfChain = in.readInt();
-		if(lengthOfChain > 0){
-			chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
-			in.readFully(chainVertexId);
-		}
-		else
-			chainVertexId = new byte[0];
-
+		sourceVertexId.readFields(in);
+		chainVertexId.readFields(in);
+		adjMap = in.readByte();
 		message = in.readInt();
 		sourceVertexState = in.readInt();
-		
-		sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
-		in.readFully(sourceVertexId);
-		neighberInfo = in.readByte();
 	}
 
-    @Override
+	 @Override
     public int hashCode() {
-    	int hashCode = 0;
-    	for(int i = 0; i < chainVertexId.length; i++)
-    		hashCode = (int)chainVertexId[i];
-        return hashCode;
+        return chainVertexId.hashCode();
     }
+	 
     @Override
     public boolean equals(Object o) {
-        if (o instanceof LogAlgorithmMessageWritable) {
+        if (o instanceof NaiveAlgorithmMessageWritable) {
         	LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
-            return chainVertexId == tp.chainVertexId && file == tp.file;
+            return chainVertexId.equals(tp.chainVertexId);
         }
         return false;
     }
+    
     @Override
     public String toString() {
-        return chainVertexId.toString() + "\t" + file.getAbsolutePath();
+        return chainVertexId.toString();
     }
-    @Override
+    
+	@Override
 	public int compareTo(LogAlgorithmMessageWritable tp) {
-		// TODO Auto-generated method stub
-        int cmp;
-        if (chainVertexId == tp.chainVertexId)
-            cmp = 0;
-        else
-            cmp = 1;
-        if (cmp != 0)
-            return cmp;
-        if (file == tp.file)
-            return 0;
-        else
-            return 1;
+		return chainVertexId.compareTo(tp.chainVertexId);
 	}
-
-
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
deleted file mode 100644
index 3872514..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
-
-public class MessageWritable implements WritableComparable<MessageWritable>{
-	/**
-	 * sourceVertexId stores source vertexId when headVertex sends the message
-	 * 				  stores neighber vertexValue when pathVertex sends the message
-	 * chainVertexId stores the chains of connected DNA
-	 * file stores the point to the file that stores the chains of connected DNA
-	 */
-	private byte[] sourceVertexId;
-	private byte neighberInfo;
-	private byte[] chainVertexId;
-	private File file;
-	private boolean isRear;
-	private int lengthOfChain;
-	private byte[] head;
-	
-	public MessageWritable(){		
-	}
-	
-	public void set(byte[] sourceVertexId, byte neighberInfo, byte[] chainVertexId, File file, byte[] head){
-		this.sourceVertexId = sourceVertexId;
-		this.neighberInfo = neighberInfo;
-		this.chainVertexId = chainVertexId;
-		this.file = file;
-		this.isRear = false;
-		this.lengthOfChain = 0;
-		this.head = head;
-	}
-
-	public byte[] getSourceVertexId() {
-		return sourceVertexId;
-	}
-
-	public void setSourceVertexId(byte[] sourceVertexId) {
-		this.sourceVertexId = sourceVertexId;
-	}
-
-	public byte getNeighberInfo() {
-		return neighberInfo;
-	}
-
-	public void setNeighberInfo(byte neighberInfo) {
-		this.neighberInfo = neighberInfo;
-	}
-
-	public byte[] getChainVertexId() {
-		return chainVertexId;
-	}
-
-	public void setChainVertexId(byte[] chainVertexId) {
-		this.chainVertexId = chainVertexId;
-	}
-
-	public File getFile() {
-		return file;
-	}
-
-	public void setFile(File file) {
-		this.file = file;
-	}
-
-	public boolean isRear() {
-		return isRear;
-	}
-
-	public void setRear(boolean isRear) {
-		this.isRear = isRear;
-	}
-
-	public int getLengthOfChain() {
-		return lengthOfChain;
-	}
-
-	public void setLengthOfChain(int lengthOfChain) {
-		this.lengthOfChain = lengthOfChain;
-	}
-	
-
-	public byte[] getHead() {
-		return head;
-	}
-
-	public void setHead(byte[] head) {
-		this.head = head;
-	}
-
-	public void incrementLength(){
-		this.lengthOfChain++;
-	}
-	
-	@Override
-	public void write(DataOutput out) throws IOException {
-		// TODO Auto-generated method stub
-		out.writeInt(lengthOfChain);
-		if(lengthOfChain != 0)
-			out.write(chainVertexId);
-		out.write(sourceVertexId);
-		out.write(head);
-		out.write(neighberInfo);
-		out.writeBoolean(isRear);
-	}
-
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		// TODO Auto-generated method stub
-		lengthOfChain = in.readInt();
-		if(lengthOfChain > 0){
-			chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
-			in.readFully(chainVertexId);
-		}
-		else
-			chainVertexId = new byte[0];
-		sourceVertexId = new byte[(MergeGraphVertex.kmerSize-1)/4 + 1];
-		in.readFully(sourceVertexId);
-		head = new byte[(MergeGraphVertex.kmerSize-1)/4 + 1];
-		in.readFully(head);
-		neighberInfo = in.readByte();
-		isRear = in.readBoolean();
-
-	}
-
-    @Override
-    public int hashCode() {
-    	int hashCode = 0;
-    	for(int i = 0; i < chainVertexId.length; i++)
-    		hashCode = (int)chainVertexId[i];
-        return hashCode;
-    }
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof MessageWritable) {
-        	MessageWritable tp = (MessageWritable) o;
-            return chainVertexId == tp.chainVertexId && file == tp.file;
-        }
-        return false;
-    }
-    @Override
-    public String toString() {
-        return chainVertexId.toString() + "\t" + file.getAbsolutePath();
-    }
-    
-	@Override
-	public int compareTo(MessageWritable tp) {
-		// TODO Auto-generated method stub
-        int cmp;
-        if (chainVertexId == tp.chainVertexId)
-            cmp = 0;
-        else
-            cmp = 1;
-        if (cmp != 0)
-            return cmp;
-        if (file == tp.file)
-            return 0;
-        else
-            return 1;
-	}
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
new file mode 100644
index 0000000..55a626d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
@@ -0,0 +1,125 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+public class NaiveAlgorithmMessageWritable implements WritableComparable<NaiveAlgorithmMessageWritable>{
+	/**
+	 * sourceVertexId stores source vertexId when headVertex sends the message
+	 * 				  stores neighber vertexValue when pathVertex sends the message
+	 * chainVertexId stores the chains of connected DNA
+	 * file stores the point to the file that stores the chains of connected DNA
+	 */
+	private KmerBytesWritable sourceVertexId;
+	private VKmerBytesWritable chainVertexId;
+	private KmerBytesWritable headVertexId;
+	private byte adjMap;
+	private boolean isRear;
+	
+	public NaiveAlgorithmMessageWritable(){
+		sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+		chainVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+		headVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+	}
+	
+	public void set(KmerBytesWritable sourceVertex, VKmerBytesWritable chainVertex, KmerBytesWritable headVertex , byte adjMap, boolean isRear){
+		this.sourceVertexId.set(sourceVertex);
+		this.chainVertexId.set(chainVertex);
+		this.headVertexId.set(headVertex);
+		this.adjMap = adjMap;
+		this.isRear = isRear;
+	}
+
+	public KmerBytesWritable getSourceVertexId() {
+		return sourceVertexId;
+	}
+
+	public void setSourceVertexId(KmerBytesWritable source) {
+		this.sourceVertexId.set(source);
+	}
+
+	public byte getAdjMap() {
+		return adjMap;
+	}
+
+	public void setAdjMap(byte adjMap) {
+		this.adjMap = adjMap;
+	}
+
+	public void setChainVertexId(VKmerBytesWritable chainVertex) {
+		this.chainVertexId.set(chainVertex);
+	}
+
+	public VKmerBytesWritable getChainVertexId() {
+		return chainVertexId;
+	}
+
+	public boolean isRear() {
+		return isRear;
+	}
+
+	public void setRear(boolean isRear) {
+		this.isRear = isRear;
+	}
+
+	public int getLengthOfChain() {
+		return this.chainVertexId.getKmerLength();
+	}
+	
+	
+	public KmerBytesWritable getHeadVertexId() {
+		return headVertexId;
+	}
+
+	public void setHeadVertexId(KmerBytesWritable headVertexId) {
+		this.headVertexId.set(headVertexId);
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		sourceVertexId.write(out);
+		headVertexId.write(out);
+		chainVertexId.write(out);
+		out.write(adjMap);
+		out.writeBoolean(isRear);
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		sourceVertexId.readFields(in);
+		headVertexId.readFields(in);
+		chainVertexId.readFields(in);
+		adjMap = in.readByte();
+		isRear = in.readBoolean();
+	}
+
+    @Override
+    public int hashCode() {
+        return chainVertexId.hashCode();
+    }
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof NaiveAlgorithmMessageWritable) {
+        	NaiveAlgorithmMessageWritable tp = (NaiveAlgorithmMessageWritable) o;
+            return chainVertexId.equals( tp.chainVertexId);
+        }
+        return false;
+    }
+    @Override
+    public String toString() {
+        return chainVertexId.toString();
+    }
+    
+	@Override
+	public int compareTo(NaiveAlgorithmMessageWritable tp) {
+		return chainVertexId.compareTo(tp.chainVertexId);
+	}
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index ffc1d38..769277c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -4,35 +4,42 @@
 
 import org.apache.hadoop.io.WritableComparable;
 
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
 
 public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
 
-	private byte value;
+	private byte adjMap;
 	private int state;
-	private int lengthOfMergeChain;
-	private byte[] mergeChain;
+	private VKmerBytesWritable mergeChain;
 
 	public ValueStateWritable() {
 		state = State.NON_VERTEX;
-		lengthOfMergeChain = 0;
+		mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
 	}
 
-	public ValueStateWritable(byte value, int state, int lengthOfMergeChain, byte[] mergeChain) {
-		this.value = value;
+	public ValueStateWritable(byte adjMap, int state, VKmerBytesWritable mergeChain) {
+		this.adjMap = adjMap;
 		this.state = state;
-		this.lengthOfMergeChain = lengthOfMergeChain;
-		this.mergeChain = mergeChain;
+		this.mergeChain.set(mergeChain);
+	}
+	
+	public void set(byte adjMap, int state, VKmerBytesWritable mergeChain){
+		this.adjMap = adjMap;
+		this.state = state;
+		this.mergeChain.set(mergeChain);
 	}
 
-	public byte getValue() {
-		return value;
+	public byte getAdjMap() {
+		return adjMap;
 	}
 
-	public void setValue(byte value) {
-		this.value = value;
+	public void setAdjMap(byte adjMap) {
+		this.adjMap = adjMap;
 	}
 
 	public int getState() {
@@ -44,43 +51,33 @@
 	}
 
 	public int getLengthOfMergeChain() {
-		return lengthOfMergeChain;
+		return mergeChain.getKmerLength();
 	}
 
-	public void setLengthOfMergeChain(int lengthOfMergeChain) {
-		this.lengthOfMergeChain = lengthOfMergeChain;
-	}
-
-	public byte[] getMergeChain() {
+	public VKmerBytesWritable getMergeChain() {
 		return mergeChain;
 	}
 
-	public void setMergeChain(byte[] mergeChain) {
-		this.mergeChain = mergeChain;
+	public void setMergeChain(KmerBytesWritable mergeChain) {
+		this.mergeChain.set(mergeChain);
+	}
+	
+	public void setMergeChain(VKmerBytesWritable mergeChain) {
+		this.mergeChain.set(mergeChain);
 	}
 
 	@Override
 	public void readFields(DataInput in) throws IOException {
-		value = in.readByte();
+		adjMap = in.readByte();
 		state = in.readInt();
-		lengthOfMergeChain = in.readInt();
-		if(lengthOfMergeChain < 0)
-			System.out.println();
-		if(lengthOfMergeChain != 0){
-			mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
-			in.readFully(mergeChain);
-		}
-		else
-			mergeChain = new byte[0];
+		mergeChain.readFields(in);
 	}
 
 	@Override
 	public void write(DataOutput out) throws IOException {
-		out.writeByte(value);
+		out.writeByte(adjMap);
 		out.writeInt(state);
-		out.writeInt(lengthOfMergeChain);
-		if(lengthOfMergeChain != 0)
-			out.write(mergeChain);
+		mergeChain.write(out);
 	}
 
 	@Override
@@ -91,11 +88,11 @@
 	
 	@Override
 	public String toString() {
-		if(lengthOfMergeChain == 0)
-			return Kmer.GENE_CODE.getSymbolFromBitMap(value);
-		return 	Kmer.GENE_CODE.getSymbolFromBitMap(value) + "\t" +
-				lengthOfMergeChain + "\t" +
-				Kmer.recoverKmerFrom(lengthOfMergeChain, mergeChain, 0, mergeChain.length) + "\t" +
+		if(mergeChain.getKmerLength() == 0)
+			return GeneCode.getSymbolFromBitMap(adjMap);
+		return 	GeneCode.getSymbolFromBitMap(adjMap) + "\t" +
+				getLengthOfMergeChain() + "\t" +
+				mergeChain.toString() + "\t" +
 				state;
 	}
 	
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
deleted file mode 100644
index a3f0b9f..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.*;
-
-import org.apache.hadoop.io.WritableComparable;
-
-public class ValueWritable implements WritableComparable<ValueWritable> {
-
-	private byte value;
-	private int lengthOfMergeChain;
-	private byte[] mergeChain;
-
-	public ValueWritable() {
-		lengthOfMergeChain = 0;
-	}
-
-	public ValueWritable(byte value, int lengthOfMergeChain, byte[] mergeChain) {
-		this.value = value;
-		this.lengthOfMergeChain = lengthOfMergeChain;
-		this.mergeChain = mergeChain;
-	}
-
-	public byte getValue() {
-		return value;
-	}
-
-	public void setValue(byte value) {
-		this.value = value;
-	}
-
-	public int getLengthOfMergeChain() {
-		return lengthOfMergeChain;
-	}
-
-	public void setLengthOfMergeChain(int lengthOfMergeChain) {
-		this.lengthOfMergeChain = lengthOfMergeChain;
-	}
-
-	public byte[] getMergeChain() {
-		return mergeChain;
-	}
-
-	public void setMergeChain(byte[] mergeChain) {
-		this.mergeChain = mergeChain;
-	}
-
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		value = in.readByte();
-		lengthOfMergeChain = in.readInt();
-		if(lengthOfMergeChain != 0){
-			mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
-			in.readFully(mergeChain);
-		}
-		else
-			mergeChain = new byte[0];
-	}
-
-	@Override
-	public void write(DataOutput out) throws IOException {
-		out.writeByte(value);
-		out.writeInt(lengthOfMergeChain);
-		if(lengthOfMergeChain != 0)
-			out.write(mergeChain);
-	}
-
-	@Override
-	public int compareTo(ValueWritable o) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-	
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
index 7e56a66..6105f18 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -4,27 +4,22 @@
 import java.util.logging.Handler;
 import java.util.logging.LogRecord;
 
-import org.apache.hadoop.io.BytesWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
 import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
 public class DataLoadLogFormatter extends Formatter{
-    private BytesWritable key;
+    private VKmerBytesWritable key;
     private KmerCountValue value;
-    private int k;
 
-    public void set(BytesWritable key, 
-    		KmerCountValue value, int k){
-    	this.key = key;
+    public void set(VKmerBytesWritable key, 
+    		KmerCountValue value){
+    	this.key.set(key);
     	this.value = value;
-    	this.k = k;
     }
     public String format(LogRecord record) {
-        StringBuilder builder = new StringBuilder(1000);
+        StringBuilder builder = new StringBuilder(1000); 
         
-        builder.append(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
-							key.getLength())
+        builder.append(key.toString()
 							+ "\t" + value.toString() + "\r\n");
 
         if(!formatMessage(record).equals(""))
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 3af9b6f..d4f03ee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -5,7 +5,7 @@
 import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
 import edu.uci.ics.genomix.pregelix.type.Message;
 import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
 public class LogAlgorithmLogFormatter extends Formatter {
 	//
@@ -13,13 +13,11 @@
     //
     //private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
     private long step;
-    private byte[] sourceVertexId;
-    private byte[] destVertexId;
-    private LogAlgorithmMessageWritable msg;
+    private VKmerBytesWritable sourceVertexId = new VKmerBytesWritable(1);
+    private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+    private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
     private int state;
-    private int k;
-    private byte[] mergeChain;
-    private int lengthOfMergeChain;
+    private VKmerBytesWritable mergeChain = new VKmerBytesWritable(1);;
     //private boolean testDelete = false;
     /** 0: general operation 
      *  1: testDelete 
@@ -27,60 +25,57 @@
      *  3: testVoteToHalt
      */ 
     private int operation; 
+    
+    public LogAlgorithmLogFormatter(){
+    }
 
-    public void set(long step, byte[] sourceVertexId, 
-    		byte[] destVertexId, LogAlgorithmMessageWritable msg, int state, int k){
+    public void set(long step, VKmerBytesWritable sourceVertexId, 
+    		VKmerBytesWritable destVertexId, LogAlgorithmMessageWritable msg, int state){
     	this.step = step;
-    	this.sourceVertexId = sourceVertexId;
-    	this.destVertexId = destVertexId;
+    	this.sourceVertexId.set(sourceVertexId);
+    	this.destVertexId.set(destVertexId);
     	this.msg = msg;
     	this.state = state;
-    	this.k = k;
     	this.operation = 0;
     }
-    public void setMergeChain(long step, byte[] sourceVertexId, 
-    		int lengthOfMergeChain, byte[] mergeChain, int k){
+    public void setMergeChain(long step, VKmerBytesWritable sourceVertexId, 
+    		VKmerBytesWritable mergeChain){
     	this.reset();
     	this.step = step;
-    	this.sourceVertexId = sourceVertexId;
-    	this.lengthOfMergeChain = lengthOfMergeChain;
-    	this.mergeChain = mergeChain;
-    	this.k = k;
+    	this.sourceVertexId.set(sourceVertexId);
+    	this.mergeChain.set(mergeChain);
     	this.operation = 2;
     }
-    public void setVotoToHalt(long step, byte[] sourceVertexId, int k){
+    public void setVotoToHalt(long step, VKmerBytesWritable sourceVertexId){
     	this.reset();
     	this.step = step;
-    	this.sourceVertexId = sourceVertexId;
-    	this.k = k;
+    	this.sourceVertexId.set(sourceVertexId);
     	this.operation = 3;
     }
     public void reset(){
-    	this.sourceVertexId = null;
-    	this.destVertexId = null;
-    	this.msg = null;
+    	this.sourceVertexId = new VKmerBytesWritable(1);
+    	this.destVertexId = new VKmerBytesWritable(1);
+    	this.msg = new LogAlgorithmMessageWritable();
     	this.state = 0;
-    	this.k = 0;
-    	this.mergeChain = null;
-    	this.lengthOfMergeChain = 0;
+    	this.mergeChain = new VKmerBytesWritable(1);
     }
     public String format(LogRecord record) {
         StringBuilder builder = new StringBuilder(1000);
-        String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+        String source = sourceVertexId.toString();
         String chain = "";
         
         builder.append("Step: " + step + "\r\n");
         builder.append("Source Code: " + source + "\r\n");
         if(operation == 0){
-	        if(destVertexId != null){
-	        	String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+	        if(destVertexId.getKmerLength() != -1){
+	        	String dest = destVertexId.toString();
 		        builder.append("Send message to " + "\r\n");
 		        builder.append("Destination Code: " + dest + "\r\n");
 	        }
 	        builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getMessage()) + "\r\n");
 	        	
-	        if(msg.getLengthOfChain() != 0){
-	        	chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+	        if(msg.getLengthOfChain() != -1){
+	        	chain = msg.getChainVertexId().toString();
 	        	builder.append("Chain Message: " + chain + "\r\n");
 	        	builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
 	        }
@@ -88,9 +83,9 @@
 	        builder.append("State is: " + State.STATE_CONTENT.getContentFromCode(state) + "\r\n");
         }
         if(operation == 2){
-        	chain = Kmer.recoverKmerFrom(lengthOfMergeChain, mergeChain, 0, mergeChain.length);
+        	chain = mergeChain.toString();
         	builder.append("Merge Chain: " + chain + "\r\n");
-        	builder.append("Merge Chain Length: " + lengthOfMergeChain + "\r\n");
+        	builder.append("Merge Chain Length: " + mergeChain.getKmerLength() + "\r\n");
         }
         if(operation == 3)
         	builder.append("Vote to halt!");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
index c337a16..332d6d0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
@@ -2,8 +2,8 @@
 
 import java.util.logging.*;
 
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
 public class NaiveAlgorithmLogFormatter extends Formatter {
 	//
@@ -11,22 +11,20 @@
     //
     //private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
     private long step;
-    private byte[] sourceVertexId;
-    private byte[] destVertexId;
-    private MessageWritable msg;
-    private int k;
+    private VKmerBytesWritable sourceVertexId;
+    private VKmerBytesWritable destVertexId;
+    private NaiveAlgorithmMessageWritable msg;
 
-    public void set(long step, byte[] sourceVertexId, 
-    		byte[] destVertexId, MessageWritable msg, int k){
+    public void set(long step, VKmerBytesWritable sourceVertexId, 
+    		VKmerBytesWritable destVertexId, NaiveAlgorithmMessageWritable msg){
     	this.step = step;
-    	this.sourceVertexId = sourceVertexId;
-    	this.destVertexId = destVertexId;
+    	this.sourceVertexId.set(sourceVertexId);
+    	this.destVertexId.set(destVertexId);
     	this.msg = msg;
-    	this.k = k;
     }
     public String format(LogRecord record) {
         StringBuilder builder = new StringBuilder(1000);
-        String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+        String source = sourceVertexId.toString();
         
         String chain = "";
         
@@ -35,11 +33,11 @@
         
         if(destVertexId != null){
         	builder.append("Send message to " + "\r\n");
-        	String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+        	String dest = destVertexId.toString();
         	builder.append("Destination Code: " + dest + "\r\n");
         }
         if(msg.getLengthOfChain() != 0){
-        	chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+        	chain = msg.getChainVertexId().toString();
         	builder.append("Chain Message: " + chain + "\r\n");
         	builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
similarity index 62%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
index 580e1fe..6fef3a6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
@@ -1,16 +1,15 @@
-package edu.uci.ics.genomix.pregelix;
+package edu.uci.ics.genomix.pregelix.operator;
 
 import java.util.Iterator;
 
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 
 import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
@@ -18,7 +17,7 @@
  * vertexId: BytesWritable
  * vertexValue: ByteWritable
  * edgeValue: NullWritable
- * message: MessageWritable
+ * message: NaiveAlgorithmMessageWritable
  * 
  * DNA:
  * A: 00
@@ -42,14 +41,13 @@
  * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
  * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
  */
-public class LoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+public class LoadGraphVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
 
 	/**
 	 * For test, just output original file
 	 */
 	@Override
-	public void compute(Iterator<MessageWritable> msgIterator) {
-		deleteVertex(getVertexId());
+	public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
 		voteToHalt();
 	}
 
@@ -57,15 +55,15 @@
 	 * @param args
 	 */
 	public static void main(String[] args) throws Exception {
-		//final int k = Integer.parseInt(args[0]);
         PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
         job.setVertexClass(LoadGraphVertex.class);
         /**
          * BinaryInput and BinaryOutput
          */
-        job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class); 
-        job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class); 
-        job.setOutputKeyClass(BytesWritable.class);
+        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); 
+        job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class); 
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
         job.setOutputValueClass(ValueStateWritable.class);
         Client.run(args, job);
 	}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
new file mode 100644
index 0000000..fa77d43
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -0,0 +1,371 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ValueStateWritable
+ * edgeValue: NullWritable
+ * message: LogAlgorithmMessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10	0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+public class LogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{	
+	
+	public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
+	public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+	public static int kmerSize = -1;
+	private int maxIteration = -1;
+	
+	private ValueStateWritable vertexVal = new ValueStateWritable();
+	
+	private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
+	
+	private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+	private VKmerBytesWritable vertexId = new VKmerBytesWritable(1); 
+	private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1); 
+	private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+	private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+	/**
+	 * initiate kmerSize, maxIteration
+	 */
+	public void initVertex(){
+		if(kmerSize == -1)
+			kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if (maxIteration < 0) 
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
+		vertexId.set(getVertexId());
+		vertexVal = getVertexValue();
+	}
+	/**
+	 * get destination vertex
+	 */
+	public VKmerBytesWritable getNextDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+		return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+	}
+	
+	public VKmerBytesWritable getPreDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+		return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+	}
+	
+	public VKmerBytesWritable getNextDestVertexIdFromBitmap(VKmerBytesWritable chainVertexId, byte adjMap){
+		return getDestVertexIdFromChain(chainVertexId, adjMap);//GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)
+	}
+	
+	public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+		lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+		return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+	}
+	/**
+	 * head send message to all next nodes
+	 */
+	public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
+		for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+			if((adjMap & (1 << x)) != 0){
+				destVertexId.set(getNextDestVertexId(vertexId, x));
+				sendMsg(destVertexId, msg);
+			}
+		}
+	}
+	/**
+	 * head send message to all previous nodes
+	 */
+	public void sendMsgToAllPreviousNodes(VKmerBytesWritable vertexId, byte adjMap){
+		for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+			if(((adjMap >> 4) & (1 << x)) != 0){
+				destVertexId.set(getPreDestVertexId(vertexId, x));
+				sendMsg(destVertexId, msg);
+			}
+		}
+	}
+
+	/**
+	 * set vertex state
+	 */
+	public void setState(){
+		if(msg.getMessage() == Message.START && 
+				(vertexVal.getState() == State.MID_VERTEX || vertexVal.getState() == State.END_VERTEX)){
+			vertexVal.setState(State.START_VERTEX);
+			setVertexValue(vertexVal);
+		}
+		else if(msg.getMessage() == Message.END && vertexVal.getState() == State.MID_VERTEX){
+			vertexVal.setState(State.END_VERTEX);
+			setVertexValue(vertexVal);
+			voteToHalt();
+		}
+		else
+			voteToHalt();
+	}
+	/**
+	 * send start message to next node
+	 */
+	public void sendStartMsgToNextNode(){
+		msg.setMessage(Message.START);
+		msg.setSourceVertexId(vertexId);
+		sendMsg(destVertexId, msg);
+		voteToHalt();
+	}
+	/**
+	 * send end message to next node
+	 */
+	public void sendEndMsgToNextNode(){
+		msg.setMessage(Message.END);
+		msg.setSourceVertexId(vertexId);
+		sendMsg(destVertexId, msg);
+		voteToHalt();
+	}
+	/**
+	 * send non message to next node
+	 */
+	public void sendNonMsgToNextNode(){
+		msg.setMessage(Message.NON);
+		msg.setSourceVertexId(vertexId);
+		sendMsg(destVertexId, msg);
+	}
+	/**
+	 * head send message to path
+	 */
+	public void sendMsgToPathVertex(VKmerBytesWritable chainVertexId, byte adjMap){
+		if(GeneCode.getGeneCodeFromBitMap((byte)(vertexVal.getAdjMap() & 0x0F)) == -1) //|| lastKmer == null
+			voteToHalt();
+		else{
+			destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
+			if(vertexVal.getState() == State.START_VERTEX){
+				sendStartMsgToNextNode();
+			}
+			else if(vertexVal.getState() != State.END_VERTEX && vertexVal.getState() != State.FINAL_DELETE){
+				sendEndMsgToNextNode();
+			}
+		}
+	}
+	/**
+	 * path send message to head 
+	 */
+	public void responseMsgToHeadVertex(){
+		if(vertexVal.getLengthOfMergeChain() == -1){
+			vertexVal.setMergeChain(vertexId);
+			setVertexValue(vertexVal);
+		}
+		msg.set(msg.getSourceVertexId(), vertexVal.getMergeChain(), vertexVal.getAdjMap(), msg.getMessage(), vertexVal.getState());
+		setMessageType(msg.getMessage());
+		destVertexId.set(msg.getSourceVertexId());
+		sendMsg(destVertexId,msg);
+	}
+	/**
+	 * set message type
+	 */
+	public void setMessageType(int message){
+		//kill Message because it has been merged by the head
+		if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+			msg.setMessage(Message.END);
+			vertexVal.setState(State.FINAL_DELETE);
+			setVertexValue(vertexVal);
+			//deleteVertex(getVertexId());
+		}
+		else
+			msg.setMessage(Message.NON);
+		
+		if(message == Message.START){
+			vertexVal.setState(State.TODELETE);
+			setVertexValue(vertexVal);
+		}
+	}
+	/**
+	 *  set vertexValue's state chainVertexId, value
+	 */
+	public void setVertexValueAttributes(){
+		if(msg.getMessage() == Message.END){
+			if(vertexVal.getState() != State.START_VERTEX)
+				vertexVal.setState(State.END_VERTEX);
+			else
+				vertexVal.setState(State.FINAL_VERTEX);
+		}
+			
+		if(getSuperstep() == 5)
+			chainVertexId.set(vertexId);
+		else
+			chainVertexId.set(vertexVal.getMergeChain());
+		lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
+		chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
+		vertexVal.setMergeChain(chainVertexId);
+		
+		byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
+		vertexVal.setAdjMap(tmpVertexValue);
+	}
+	/**
+	 *  send message to self
+	 */
+	public void sendMsgToSelf(){
+		if(msg.getMessage() != Message.END){
+			setVertexValue(vertexVal);
+			msg.reset(); //reset
+			msg.setAdjMap(vertexVal.getAdjMap());
+			sendMsg(vertexId,msg);
+		}
+	}
+	/**
+	 * start sending message
+	 */
+	public void startSendMsg(){
+		if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
+			msg.set(vertexId, chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
+			sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+			voteToHalt();
+		}
+		if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
+			msg.set(vertexId, chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
+			sendMsgToAllPreviousNodes(vertexId, vertexVal.getAdjMap());
+			voteToHalt();
+		}
+		if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+			vertexVal.setState(State.MID_VERTEX);
+			setVertexValue(vertexVal);
+		}
+	}
+	/**
+	 *  initiate head, rear and path node
+	 */
+	public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
+		while(msgIterator.hasNext()){
+			if(!GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+				msgIterator.next();
+				voteToHalt();
+			}
+			else{
+				msg = msgIterator.next();
+				setState();
+			}
+		}
+	}
+	/**
+	 * head send message to path
+	 */
+	public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+		if(getSuperstep() == 3){
+			msg.reset();
+			sendMsgToPathVertex(vertexId, vertexVal.getAdjMap());
+		}
+		else{
+			if(msgIterator.hasNext()){
+				msg = msgIterator.next();
+				sendMsgToPathVertex(vertexVal.getMergeChain(), msg.getAdjMap());
+			}
+		}
+	}
+	/**
+	 * path response message to head
+	 */
+	public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+		if(msgIterator.hasNext()){
+			msg = msgIterator.next();
+			responseMsgToHeadVertex();
+			//voteToHalt();
+		}
+		else{
+			if(getVertexValue().getState() != State.START_VERTEX 
+					&& getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+				//vertexVal.setState(State.KILL_SELF);
+				//setVertexValue(vertexVal);
+				//voteToHalt();
+				deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+			}
+		}
+	}
+	/**
+	 * merge chainVertex and store in vertexVal.chainVertexId
+	 */
+	public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+		if(msgIterator.hasNext()){
+			msg = msgIterator.next();
+			setVertexValueAttributes();
+			sendMsgToSelf();
+		}
+		if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+			voteToHalt();
+		}
+		if(vertexVal.getState() == State.FINAL_VERTEX){
+			//String source = vertexVal.getMergeChain().toString();
+			voteToHalt();
+		}
+	}
+	@Override
+	public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+		initVertex();
+		if(vertexVal.getState() != State.NON_EXIST && vertexVal.getState() != State.KILL_SELF){
+			if (getSuperstep() == 1) 
+				startSendMsg();
+			else if(getSuperstep() == 2)
+				initState(msgIterator);
+			else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
+				sendMsgToPathVertex(msgIterator);
+			}
+			else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
+				responseMsgToHeadVertex(msgIterator);
+			}
+			else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
+				if(vertexVal.getState() == State.TODELETE){ //|| vertexVal.getState() == State.KILL_SELF)
+					//vertexVal.setState(State.NON_EXIST);
+					//setVertexValue(vertexVal);
+					//voteToHalt();
+					deleteVertex(getVertexId()); //killSelf  
+				}
+				else{
+					mergeChainVertex(msgIterator);
+				}
+			}
+		}
+	}
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
+        job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+        /**
+         * BinaryInput and BinaryOutput~/
+         */
+        job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class); 
+        job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class); 
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
new file mode 100644
index 0000000..8a70bd5
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -0,0 +1,211 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10	0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class NaiveAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
+	
+	public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
+	public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+	public static int kmerSize = -1;
+	private int maxIteration = -1;
+	
+	private ValueStateWritable vertexVal = new ValueStateWritable();
+
+	private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
+
+	private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+	private VKmerBytesWritable vertexId = new VKmerBytesWritable(1); 
+	private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1); 
+	private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+	private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+
+	/**
+	 * initiate kmerSize, maxIteration
+	 */
+	public void initVertex(){
+		if(kmerSize == -1)
+			kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if (maxIteration < 0) 
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
+		vertexId.set(getVertexId());
+		vertexVal = getVertexValue();
+	}
+	public void findDestination(){
+		destVertexId.set(msg.getSourceVertexId());
+	}
+	/**
+	 * get destination vertex
+	 */
+	public VKmerBytesWritable getDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+		return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+	}
+
+	public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+		lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+		return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+	}
+	/**
+	 * head send message to all next nodes
+	 */
+	public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
+		for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+			if((adjMap & (1 << x)) != 0){
+				destVertexId.set(getDestVertexId(vertexId, x));
+				sendMsg(destVertexId, msg);
+			}
+		}
+	}
+	/**
+	 * initiate chain vertex
+	 */
+	public void initChainVertex(){
+		if(!msg.isRear()){
+			findDestination();
+			if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+				chainVertexId.set(vertexId);
+				msg.set(vertexId, chainVertexId, vertexId, vertexVal.getAdjMap(), false);
+				sendMsg(destVertexId,msg);
+			}else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap()))
+				voteToHalt();
+		}
+	}
+	/**
+	 * head node sends message to path node
+	 */
+	public void sendMsgToPathVertex(){
+		if(!msg.isRear()){
+			destVertexId.set(getDestVertexIdFromChain(msg.getChainVertexId(), msg.getAdjMap()));
+		}else{
+			destVertexId.set(msg.getHeadVertexId());
+		}
+		msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+		sendMsg(destVertexId,msg);
+	}
+	/**
+	 * path node sends message back to head node
+	 */
+	public void responseMsgToHeadVertex(){
+		if(!msg.isRear()){
+			findDestination();
+			if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+				chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
+						vertexId.getGeneCodeAtPosition(kmerSize - 1));
+				deleteVertex(getVertexId());
+				//vertexVal.setState(State.NON_EXIST); 
+				//setVertexValue(vertexVal);
+				msg.set(vertexId, chainVertexId, msg.getHeadVertexId(), vertexVal.getAdjMap(), false);
+				sendMsg(destVertexId,msg);
+			}
+			else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
+				msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
+				sendMsg(destVertexId,msg);
+			}
+		}else{// is Rear
+			chainVertexId.set(msg.getSourceVertexId());
+			vertexVal.set(GraphVertexOperation.updateRightNeighberByVertexId(vertexVal.getAdjMap(), chainVertexId, kmerSize),
+					State.START_VERTEX, msg.getChainVertexId());
+			setVertexValue(vertexVal);
+			//String source = msg.getChainVertexId().toString();
+			//System.out.print("");
+		}
+	}
+	
+	@Override
+	public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+		initVertex();
+		if(vertexVal.getState() != State.NON_EXIST){
+			if (getSuperstep() == 1) {
+				if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){ 
+					msg.set(vertexId, chainVertexId, vertexId, (byte)0, false);
+					sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+				}
+			}
+			else if(getSuperstep() == 2){
+				if(msgIterator.hasNext()){
+					msg = msgIterator.next();
+					initChainVertex();
+				}
+			}
+			//head node sends message to path node
+			else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+				while (msgIterator.hasNext()){
+					msg = msgIterator.next();
+					sendMsgToPathVertex();
+				}
+			}
+			//path node sends message back to head node
+			else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
+				 while(msgIterator.hasNext()){
+					msg = msgIterator.next();
+					responseMsgToHeadVertex();
+				}
+			}
+		}
+		voteToHalt();
+	}
+
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
+        job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); 
+        job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class); 
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        Client.run(args, job);
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
index d9f0efe..c7349dd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
@@ -5,14 +5,12 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerCountValue;
 
-
 public class CombineSequenceFile {
 
 	/**
@@ -25,24 +23,23 @@
 		Configuration conf = new Configuration();
 		FileSystem fileSys = FileSystem.get(conf);
 		
-		Path p = new Path("data/SinglePath_55");
-		Path p2 = new Path("data/result");
-		Path outFile = new Path(p2, "output"); 
+		Path p = new Path("output");
+		//Path p2 = new Path("data/result");
+		Path outFile = new Path("output"); 
 		SequenceFile.Reader reader;
 	    SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
-	         outFile, BytesWritable.class, KmerCountValue.class, 
+	         outFile, KmerBytesWritable.class, KmerCountValue.class, 
 	         CompressionType.NONE);
-	    BytesWritable key = new BytesWritable();
+	    KmerBytesWritable key = new KmerBytesWritable(kmerSize);
 	    KmerCountValue value = new KmerCountValue();
 	    
-	    File dir = new File("data/SinglePath_55");
+	    File dir = new File("output");
 		for(File child : dir.listFiles()){
 			String name = child.getAbsolutePath();
 			Path inFile = new Path(p, name);
 			reader = new SequenceFile.Reader(fileSys, inFile, conf);
 			while (reader.next(key, value)) {
-				System.out.println(Kmer.recoverKmerFrom(kmerSize, key.getBytes(), 0,
-						key.getLength())
+				System.out.println(key.toString()
 						+ "\t" + value.toString());
 				writer.append(key, value);
 			}
@@ -53,8 +50,7 @@
 		
 		reader = new SequenceFile.Reader(fileSys, outFile, conf);
 		while (reader.next(key, value)) {
-			System.err.println(Kmer.recoverKmerFrom(kmerSize, key.getBytes(), 0,
-					key.getLength())
+			System.err.println(key.toString()
 					+ "\t" + value.toString());
 		}
 		reader.close();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
new file mode 100644
index 0000000..d97a2fd
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -0,0 +1,48 @@
+package edu.uci.ics.genomix.pregelix.sequencefile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+
+public class GenerateSmallFile {
+
+	public static void generateNumOfLinesFromBigFile(Path inFile, Path outFile, int numOfLines) throws IOException{
+		Configuration conf = new Configuration();
+		FileSystem fileSys = FileSystem.get(conf);
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+	    SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+	         outFile, KmerBytesWritable.class, KmerCountValue.class, 
+	         CompressionType.NONE);
+	    KmerBytesWritable outKey = new KmerBytesWritable(55);
+	    KmerCountValue outValue = new KmerCountValue();
+	    int i = 0;
+	    
+	    for(i = 0; i < numOfLines; i++){
+	    	 //System.out.println(i);
+	    	 reader.next(outKey, outValue);
+	    	 writer.append(outKey, outValue);
+	    }
+	    writer.close();
+	    reader.close();
+	}
+	/**
+	 * @param args
+	 * @throws IOException 
+	 */
+	public static void main(String[] args) throws IOException {
+		// TODO Auto-generated method stub
+		Path dir = new Path("data");
+		Path inFile = new Path(dir, "part-0");
+		Path outFile = new Path(dir, "part-0-out-5000000");
+		generateNumOfLinesFromBigFile(inFile,outFile,5000000);
+	}
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 18214a8..c759261 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -7,30 +7,28 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 
 public class GenerateTextFile {
 
 	public static void generate() throws IOException{
-		BufferedWriter bw = new BufferedWriter(new FileWriter("text/new_SimplePath"));
+		BufferedWriter bw = new BufferedWriter(new FileWriter("text/log_TreePath"));
 		Configuration conf = new Configuration();
 		FileSystem fileSys = FileSystem.get(conf);
 		for(int i = 0; i < 2; i++){
 			Path path = new Path("output/part-" + i);
 			SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
-		    BytesWritable key = new BytesWritable();
+			KmerBytesWritable key = new KmerBytesWritable(5);
 		    ValueStateWritable value = new ValueStateWritable();
 		    
 		    while(reader.next(key, value)){
 				if (key == null || value == null){
 					break;
 				}
-				bw.write(Kmer.recoverKmerFrom(5, key.getBytes(), 0,
-						key.getLength())
+				bw.write(key.toString()
 						+ "\t" + value.toString());
 				bw.newLine();
 		    }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index 61bcb0c..ae3c621 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -9,6 +9,7 @@
 	public static final int FINAL_VERTEX = 5;
 	public static final int FINAL_DELETE = 6;
 	public static final int KILL_SELF = 7;
+	public static final int NON_EXIST = 8;
 	
 	public final static class STATE_CONTENT{
 
@@ -39,6 +40,9 @@
 			case KILL_SELF:
 				r = "KILL_SELF";
 				break;
+			case NON_EXIST:
+				r = "NON_EXIST";
+				break;
 			}
 			return r;
 		}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
similarity index 64%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
index 553de45..5cb0c2b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
@@ -1,16 +1,14 @@
-package edu.uci.ics.genomix.pregelix;
+package edu.uci.ics.genomix.pregelix.util;
 
-import org.apache.hadoop.io.BytesWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
 public class GraphVertexOperation {
 	
 	/**
 	 *  generate the valid data(byte[]) from BytesWritable
 	 */
-	public static byte[] generateValidDataFromBytesWritable(BytesWritable bw){
+	public static byte[] generateValidDataFromBytesWritable(VKmerBytesWritable bw){
 		byte[] wholeBytes = bw.getBytes();
 		int validNum = bw.getLength();
 		byte[] validBytes = new byte[validNum];
@@ -23,7 +21,7 @@
 	 * @param vertexValue 
 	 */
 	public static boolean isPathVertex(byte value){
-		if(KmerUtil.inDegree(value) == 1 && KmerUtil.outDegree(value) == 1)
+		if(GeneCode.inDegree(value) == 1 && GeneCode.outDegree(value) == 1)
 			return true;
 		return false;
 	}
@@ -32,7 +30,7 @@
 	 * @param vertexValue 
 	 */
 	public static boolean isHeadVertex(byte value){
-		if(KmerUtil.outDegree(value) > 0 && !isPathVertex(value))
+		if(GeneCode.outDegree(value) > 0 && !isPathVertex(value))
 			return true;
 		return false;
 	}
@@ -41,18 +39,17 @@
 	 * @param vertexValue 
 	 */
 	public static boolean isRearVertex(byte value){
-		if(KmerUtil.inDegree(value) > 0 && !isPathVertex(value))
+		if(GeneCode.inDegree(value) > 0 && !isPathVertex(value))
 			return true;
 		return false;
 	}
 	/**
 	 * update right neighber based on next vertexId
 	 */
-	public static byte updateRightNeighberByVertexId(byte oldVertexValue, byte[] neighberVertexId, int k){
+	public static byte updateRightNeighberByVertexId(byte oldVertexValue, VKmerBytesWritable neighberVertex, int k){
+		byte geneCode = neighberVertex.getGeneCodeAtPosition(k-1);
 		
-		String neighberVertex = Kmer.recoverKmerFrom(k, neighberVertexId, 0, neighberVertexId.length);
-		
-		byte newBit = Kmer.GENE_CODE.getAdjBit((byte)neighberVertex.charAt(neighberVertex.length() - 1));
+		byte newBit = GeneCode.getAdjBit(geneCode);
 		return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newBit & 0x0F));
 	}
 	/**
@@ -61,4 +58,5 @@
 	public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue){
 		return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
 	}
+
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 763d46e..f03a4ab 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -5,19 +5,18 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
-import edu.uci.ics.genomix.pregelix.LoadGraphVertex;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LoadGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 
@@ -30,10 +29,11 @@
     private static void generateLoadGraphJob(String jobName, String outputPath) throws IOException {
     	PregelixJob job = new PregelixJob(jobName);
     	job.setVertexClass(LoadGraphVertex.class);
-    	job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
-        job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
-        job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(ByteWritable.class);
+    	job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+        job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -45,15 +45,15 @@
     
     private static void generateMergeGraphJob(String jobName, String outputPath) throws IOException {
     	PregelixJob job = new PregelixJob(jobName);
-    	job.setVertexClass(MergeGraphVertex.class);
-    	job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
-        job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+    	job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+    	job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+        job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
         job.setDynamicVertexValueSize(true);
-        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputKeyClass(KmerBytesWritable.class);
         job.setOutputValueClass(ValueStateWritable.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
-        job.getConfiguration().setInt(MergeGraphVertex.KMER_SIZE, 55);
+        job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
     
@@ -63,15 +63,15 @@
     
     private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
     	PregelixJob job = new PregelixJob(jobName);
-    	job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
-        job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class); 
-        job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
+    	job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+        job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class); 
+        job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
         job.setDynamicVertexValueSize(true);
-        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputKeyClass(KmerBytesWritable.class);
         job.setOutputValueClass(ValueStateWritable.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
-        job.getConfiguration().setInt(LogAlgorithmForMergeGraphVertex.KMER_SIZE, 5);
+        job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
     
@@ -88,8 +88,6 @@
 		//genLoadGraph();
 		//genMergeGraph();
 		genLogAlgorithmForMergeGraph();
-		//genSequenceLoadGraph();
-		//genBasicBinaryLoadGraph();
 	}
 
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
index ffc1a25..fd0749a 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
 	private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
 	private static final String FILE_EXTENSION_OF_RESULTS = "result";
 
-	private static final String DATA_PATH = "data/result/TreePath";// sequenceShortFileMergeTest
+	private static final String DATA_PATH = "data/sequencefile/LongPath";
 	private static final String HDFS_PATH = "/webmap/";
 
 	private static final String HYRACKS_APP_NAME = "pregelix";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
index 4ea3c1d..8ac1b09 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
@@ -20,6 +20,13 @@
 
 public class TestUtils {
 
+    public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
+        String[] fileNames = expectedFileDir.list();
+        for (String fileName : fileNames) {
+            compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
+        }
+    }
+
     public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
         BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
         BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
@@ -28,7 +35,6 @@
         try {
             while ((lineExpected = readerExpected.readLine()) != null) {
                 lineActual = readerActual.readLine();
-                // Assert.assertEquals(lineExpected, lineActual);
                 if (lineActual == null) {
                     throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
                 }
@@ -62,8 +68,10 @@
             if (row1.equals(row2))
                 continue;
 
-            String[] fields1 = row1.split(" ");
-            String[] fields2 = row2.split(" ");
+            boolean spaceOrTab = false;
+            spaceOrTab = row1.contains(" ");
+            String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+            String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
 
             for (int j = 0; j < fields1.length; j++) {
                 if (fields1[j].equals(fields2[j])) {
@@ -76,7 +84,7 @@
                     float float1 = (float) double1.doubleValue();
                     float float2 = (float) double2.doubleValue();
 
-                    if (Math.abs(float1 - float2) == 0)
+                    if (Math.abs(float1 - float2) < 1.0e-7)
                         continue;
                     else {
                         return false;