Merge branch 'genomix/velvet_graphbuilding' into genomix/fullstack_genomix

Conflicts:
	genomix/genomix-data/.classpath
	genomix/genomix-data/.project
	genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
	genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
	genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
diff --git a/genomix/genomix-data/.classpath b/genomix/genomix-data/.classpath
deleted file mode 100644
index daa9d4b..0000000
--- a/genomix/genomix-data/.classpath
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
-	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"/>
-	<classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
-	<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
-	<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/genomix/genomix-data/.project b/genomix/genomix-data/.project
deleted file mode 100644
index 4628890..0000000
--- a/genomix/genomix-data/.project
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>genomix-data</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
-			<name>org.maven.ide.eclipse.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
-			<name>org.eclipse.m2e.core.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-		<nature>org.eclipse.m2e.core.maven2Nature</nature>
-	</natures>
-</projectDescription>
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
similarity index 93%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
index 68dec30..a030f0b 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
@@ -13,7 +13,9 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.type;
+package edu.uci.ics.genomix.data;
+
+import edu.uci.ics.genomix.type.GeneCode;
 
 public class KmerUtil {
     public static final String empty = "";
@@ -34,7 +36,7 @@
     public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
         StringBuilder strKmer = new StringBuilder();
         int byteId = keyStart + keyLength - 1;
-        if (byteId < 0) {
+        if (byteId < 0 || k < 1) {
             return empty;
         }
         byte currentbyte = keyData[byteId];
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
new file mode 100644
index 0000000..219def6
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.genomix.data;
+
+public class Marshal {
+    public static int getInt(byte[] bytes, int offset) {
+        return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+                + ((bytes[offset + 3] & 0xff) << 0);
+    }
+    
+    public static void putInt(int val, byte[] bytes, int offset) {
+        bytes[offset] = (byte)((val >>> 24) & 0xFF);        
+        bytes[offset + 1] = (byte)((val >>> 16) & 0xFF);
+        bytes[offset + 2] = (byte)((val >>>  8) & 0xFF);
+        bytes[offset + 3] = (byte)((val >>>  0) & 0xFF);
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 5be5f83..03e2fd9 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -50,96 +50,10 @@
     }
 
     public static byte getSymbolFromCode(byte code) {
-        if (code > 3) {
-            return '!';
+        if (code > 3 || code < 0) {
+            throw new IllegalArgumentException("Not such gene code");
         }
         return GENE_SYMBOL[code];
     }
 
-    public static byte getAdjBit(byte t) {
-        byte r = 0;
-        switch (t) {
-            case 'A':
-            case 'a':
-                r = 1 << A;
-                break;
-            case 'C':
-            case 'c':
-                r = 1 << C;
-                break;
-            case 'G':
-            case 'g':
-                r = 1 << G;
-                break;
-            case 'T':
-            case 't':
-                r = 1 << T;
-                break;
-        }
-        return r;
-    }
-
-    /**
-     * It works for path merge. Merge the kmer by his next, we need to make sure
-     * the @{t} is a single neighbor.
-     * 
-     * @param t
-     *            the neighbor code in BitMap
-     * @return the genecode
-     */
-    public static byte getGeneCodeFromBitMap(byte t) {
-        switch (t) {
-            case 1 << A:
-                return A;
-            case 1 << C:
-                return C;
-            case 1 << G:
-                return G;
-            case 1 << T:
-                return T;
-        }
-        return -1;
-    }
-
-    public static byte getBitMapFromGeneCode(byte t) {
-        return (byte) (1 << t);
-    }
-
-    public static int countNumberOfBitSet(int i) {
-        int c = 0;
-        for (; i != 0; c++) {
-            i &= i - 1;
-        }
-        return c;
-    }
-
-    public static int inDegree(byte bitmap) {
-        return countNumberOfBitSet((bitmap >> 4) & 0x0f);
-    }
-
-    public static int outDegree(byte bitmap) {
-        return countNumberOfBitSet(bitmap & 0x0f);
-    }
-
-    public static byte mergePreNextAdj(byte pre, byte next) {
-        return (byte) (pre << 4 | (next & 0x0f));
-    }
-
-    public static String getSymbolFromBitMap(byte code) {
-        int left = (code >> 4) & 0x0F;
-        int right = code & 0x0F;
-        StringBuilder str = new StringBuilder();
-        for (int i = A; i <= T; i++) {
-            if ((left & (1 << i)) != 0) {
-                str.append((char) GENE_SYMBOL[i]);
-            }
-        }
-        str.append('|');
-        for (int i = A; i <= T; i++) {
-            if ((right & (1 << i)) != 0) {
-                str.append((char) GENE_SYMBOL[i]);
-            }
-        }
-        return str.toString();
-    }
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index fd4c252..8f9094f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -24,8 +24,10 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 
+import edu.uci.ics.genomix.data.KmerUtil;
+
 /**
- * Fix kmer length byteswritable
+ * Variable kmer length byteswritable
  * It was used to generate the graph in which phase the kmer length doesn't change.
  * Thus the size of bytes doesn't change either.
  */
@@ -38,25 +40,15 @@
 
     protected int size;
     protected byte[] bytes;
+    protected int offset;
     protected int kmerlength;
 
-    @Deprecated
     public KmerBytesWritable() {
-        this(0, EMPTY_BYTES);
+        this(0, EMPTY_BYTES, 0);
     }
 
-    public KmerBytesWritable(int k, byte[] storage) {
-        this.kmerlength = k;
-        if (k > 0) {
-            this.size = KmerUtil.getByteNumFromK(kmerlength);
-            this.bytes = storage;
-            if (this.bytes.length < size) {
-                throw new ArrayIndexOutOfBoundsException("Storage is smaller than required space for kmerlength:k");
-            }
-        } else {
-            this.bytes = storage;
-            this.size = 0;
-        }
+    public KmerBytesWritable(int k, byte[] storage, int offset) {
+        setNewReference(k, storage, offset);
     }
 
     /**
@@ -73,28 +65,92 @@
         } else {
             this.bytes = EMPTY_BYTES;
         }
+        this.offset = 0;
     }
 
     public KmerBytesWritable(KmerBytesWritable right) {
-        if (right != null) {
-            this.kmerlength = right.kmerlength;
-            this.size = right.size;
-            this.bytes = new byte[right.size];
-            set(right);
-        }else{
-            this.kmerlength = 0;
-            this.size = 0;
-            this.bytes = EMPTY_BYTES;
+        this(right.kmerlength);
+        set(right);
+    }
+
+    public void set(KmerBytesWritable newData) {
+        if (newData == null) {
+            this.set(0, EMPTY_BYTES, 0);
+        } else {
+            this.set(newData.kmerlength, newData.bytes, 0);
+        }
+    }
+
+    public void set(byte[] newData, int offset) {
+        if (kmerlength > 0) {
+            System.arraycopy(newData, offset, bytes, offset, size);
+        }
+    }
+
+    public void set(int k, byte[] newData, int offset) {
+        reset(k);
+        if (k > 0) {
+            System.arraycopy(newData, offset, bytes, this.offset, size);
+        }
+    }
+
+    /**
+     * Reset array by kmerlength
+     * 
+     * @param k
+     */
+    public void reset(int k) {
+        this.kmerlength = k;
+        setSize(KmerUtil.getByteNumFromK(k));
+        clearLeadBit();
+    }
+
+    public void setNewReference(byte[] newData, int offset) {
+        this.bytes = newData;
+        this.offset = offset;
+        if (newData.length - offset < size) {
+            throw new IllegalArgumentException("Not given enough space");
+        }
+    }
+
+    public void setNewReference(int k, byte[] newData, int offset) {
+        this.kmerlength = k;
+        this.size = KmerUtil.getByteNumFromK(k);
+        setNewReference(newData, offset);
+    }
+
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+        this.size = size;
+    }
+
+    protected int getCapacity() {
+        return bytes.length;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap != getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (new_cap < size) {
+                size = new_cap;
+            }
+            if (size != 0) {
+                System.arraycopy(bytes, offset, new_data, 0, size);
+            }
+            bytes = new_data;
+            offset = 0;
         }
     }
 
     public byte getGeneCodeAtPosition(int pos) {
         if (pos >= kmerlength) {
-            return -1;
+            throw new IllegalArgumentException("gene position out of bound");
         }
         int posByte = pos / 4;
         int shift = (pos % 4) << 1;
-        return (byte) ((bytes[size - 1 - posByte] >> shift) & 0x3);
+        return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
     }
 
     public int getKmerLength() {
@@ -106,6 +162,10 @@
         return bytes;
     }
 
+    public int getOffset() {
+        return offset;
+    }
+
     @Override
     public int getLength() {
         return size;
@@ -128,16 +188,21 @@
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
-                bytes[bcount--] = l;
+                bytes[offset + bcount--] = l;
                 l = 0;
                 bytecount = 0;
             }
         }
         if (bcount >= 0) {
-            bytes[0] = l;
+            bytes[offset] = l;
         }
     }
 
+    public void setByRead(int k, byte[] array, int start) {
+        reset(k);
+        setByRead(array, start);
+    }
+
     /**
      * Compress Reversed Kmer into bytes array AATAG will compress as
      * [0x000A,0xATAG]
@@ -156,16 +221,21 @@
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
-                bytes[bcount--] = l;
+                bytes[offset + bcount--] = l;
                 l = 0;
                 bytecount = 0;
             }
         }
         if (bcount >= 0) {
-            bytes[0] = l;
+            bytes[offset] = l;
         }
     }
 
+    public void setByReadReverse(int k, byte[] array, int start) {
+        reset(k);
+        setByReadReverse(array, start);
+    }
+
     /**
      * Shift Kmer to accept new char input
      * 
@@ -185,14 +255,14 @@
      * @return the shift out gene, in gene code format
      */
     public byte shiftKmerWithNextCode(byte c) {
-        byte output = (byte) (bytes[size - 1] & 0x03);
+        byte output = (byte) (bytes[offset + size - 1] & 0x03);
         for (int i = size - 1; i > 0; i--) {
-            byte in = (byte) (bytes[i - 1] & 0x03);
-            bytes[i] = (byte) (((bytes[i] >>> 2) & 0x3f) | (in << 6));
+            byte in = (byte) (bytes[offset + i - 1] & 0x03);
+            bytes[offset + i] = (byte) (((bytes[offset + i] >>> 2) & 0x3f) | (in << 6));
         }
         int pos = ((kmerlength - 1) % 4) << 1;
         byte code = (byte) (c << pos);
-        bytes[0] = (byte) (((bytes[0] >>> 2) & 0x3f) | code);
+        bytes[offset] = (byte) (((bytes[offset] >>> 2) & 0x3f) | code);
         clearLeadBit();
         return output;
     }
@@ -217,34 +287,44 @@
      */
     public byte shiftKmerWithPreCode(byte c) {
         int pos = ((kmerlength - 1) % 4) << 1;
-        byte output = (byte) ((bytes[0] >> pos) & 0x03);
+        byte output = (byte) ((bytes[offset] >> pos) & 0x03);
         for (int i = 0; i < size - 1; i++) {
-            byte in = (byte) ((bytes[i + 1] >> 6) & 0x03);
-            bytes[i] = (byte) ((bytes[i] << 2) | in);
+            byte in = (byte) ((bytes[offset + i + 1] >> 6) & 0x03);
+            bytes[offset + i] = (byte) ((bytes[offset + i] << 2) | in);
         }
-        bytes[size - 1] = (byte) ((bytes[size - 1] << 2) | c);
+        bytes[offset + size - 1] = (byte) ((bytes[offset + size - 1] << 2) | c);
         clearLeadBit();
         return output;
     }
 
+    /**
+     * Merge kmer with next neighbor in gene-code format.
+     * The k of new kmer will increase by 1
+     * e.g. AAGCT merge with A => AAGCTA
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param nextCode
+     *            : next neighbor in gene-code format
+     * @return the merged Kmer, this K of this Kmer is k+1
+     */
+    public void mergeKmerWithNextCode(byte nextCode) {
+        this.kmerlength += 1;
+        setSize(KmerUtil.getByteNumFromK(kmerlength));
+        if (kmerlength % 4 == 1) {
+            for (int i = getLength() - 1; i > 0; i--) {
+                bytes[offset + i] = bytes[offset + i - 1];
+            }
+            bytes[offset] = (byte) (nextCode & 0x3);
+        } else {
+            bytes[offset] = (byte) (bytes[offset] | ((nextCode & 0x3) << (((kmerlength-1) % 4) << 1)));
+        }
+        clearLeadBit();
+    }
+
     protected void clearLeadBit() {
         if (kmerlength % 4 != 0) {
-            bytes[0] &= (1 << ((kmerlength % 4) << 1)) - 1;
-        }
-    }
-
-    public void set(KmerBytesWritable newData) {
-        if (kmerlength != newData.kmerlength){
-            throw new IllegalArgumentException("kmerSize is different, try to use VKmerBytesWritable instead");
-        }
-        if (kmerlength > 0 ){
-            set(newData.bytes, 0, newData.size);
-        }
-    }
-
-    public void set(byte[] newData, int offset, int length) {
-        if (kmerlength > 0){
-            System.arraycopy(newData, offset, bytes, 0, size);
+            bytes[offset] &= (1 << ((kmerlength % 4) << 1)) - 1;
         }
     }
 
@@ -259,8 +339,9 @@
         if (this.kmerlength > 0) {
             if (this.bytes.length < this.size) {
                 this.bytes = new byte[this.size];
+                this.offset = 0;
             }
-            in.readFully(bytes, 0, size);
+            in.readFully(bytes, offset, size);
         }
     }
 
@@ -268,7 +349,7 @@
     public void write(DataOutput out) throws IOException {
         out.writeInt(kmerlength);
         if (kmerlength > 0) {
-            out.write(bytes, 0, size);
+            out.write(bytes, offset, size);
         }
     }
 
@@ -286,7 +367,7 @@
 
     @Override
     public String toString() {
-        return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), 0, this.getLength());
+        return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), offset, this.getLength());
     }
 
     public static class Comparator extends WritableComparator {
@@ -309,5 +390,4 @@
     static { // register this comparator
         WritableComparator.define(KmerBytesWritable.class, new Comparator());
     }
-
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
similarity index 88%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
index c00967f..9d458d2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
@@ -15,11 +15,11 @@
 
 package edu.uci.ics.genomix.type;
 
-public class VKmerBytesWritableFactory {
-    private VKmerBytesWritable kmer;
+public class KmerBytesWritableFactory {
+    private KmerBytesWritable kmer;
 
-    public VKmerBytesWritableFactory(int k) {
-        kmer = new VKmerBytesWritable(k);
+    public KmerBytesWritableFactory(int k) {
+        kmer = new KmerBytesWritable(k);
     }
 
     /**
@@ -30,8 +30,9 @@
      * @param array
      * @param start
      */
-    public VKmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
-        kmer.setByRead(k, array, start);
+    public KmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
+        kmer.reset(k);
+        kmer.setByRead(array, start);
         return kmer;
     }
 
@@ -42,8 +43,9 @@
      * @param array
      * @param start
      */
-    public VKmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
-        kmer.setByReadReverse(k, array, start);
+    public KmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
+        kmer.reset(k);
+        kmer.setByReadReverse(array, start);
         return kmer;
     }
 
@@ -57,7 +59,7 @@
      * @param kmerChain
      * @return LastKmer bytes array
      */
-    public VKmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
+    public KmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
         if (lastK > kmerChain.getKmerLength()) {
             return null;
         }
@@ -93,7 +95,7 @@
      * @param kmerChain
      * @return FirstKmer bytes array
      */
-    public VKmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
+    public KmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
         if (firstK > kmerChain.getKmerLength()) {
             return null;
         }
@@ -117,7 +119,7 @@
         return kmer;
     }
 
-    public VKmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
+    public KmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
         if (startK + kSize > kmerChain.getKmerLength()) {
             return null;
         }
@@ -157,7 +159,7 @@
      *            : next neighbor in gene-code format
      * @return the merged Kmer, this K of this Kmer is k+1
      */
-    public VKmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
+    public KmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
         this.kmer.reset(kmer.getKmerLength() + 1);
         for (int i = 1; i <= kmer.getLength(); i++) {
             this.kmer.getBytes()[this.kmer.getLength() - i] = kmer.getBytes()[kmer.getLength() - i];
@@ -184,7 +186,7 @@
      *            : next neighbor in gene-code format
      * @return the merged Kmer,this K of this Kmer is k+1
      */
-    public VKmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+    public KmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
         this.kmer.reset(kmer.getKmerLength() + 1);
         int byteInMergedKmer = 0;
         if (kmer.getKmerLength() % 4 == 0) {
@@ -213,7 +215,7 @@
      *            : bytes array of next kmer
      * @return merged kmer, the new k is @preK + @nextK
      */
-    public VKmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
+    public KmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
         kmer.reset(preKmer.getKmerLength() + nextKmer.getKmerLength());
         int i = 1;
         for (; i <= preKmer.getLength(); i++) {
@@ -253,7 +255,7 @@
      *            : input genecode
      * @return new created kmer that shifted by afterCode, the K will not change
      */
-    public VKmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
+    public KmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
         this.kmer.set(kmer);
         this.kmer.shiftKmerWithNextCode(afterCode);
         return this.kmer;
@@ -271,7 +273,7 @@
      *            : input genecode
      * @return new created kmer that shifted by preCode, the K will not change
      */
-    public VKmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+    public KmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
         this.kmer.set(kmer);
         this.kmer.shiftKmerWithPreCode(preCode);
         return this.kmer;
@@ -282,7 +284,7 @@
      * 
      * @param kmer
      */
-    public VKmerBytesWritable reverse(final KmerBytesWritable kmer) {
+    public KmerBytesWritable reverse(final KmerBytesWritable kmer) {
         this.kmer.reset(kmer.getKmerLength());
 
         int curPosAtKmer = ((kmer.getKmerLength() - 1) % 4) << 1;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
deleted file mode 100644
index fab7001..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class KmerCountValue implements Writable {
-    private byte adjBitMap;
-    private byte count;
-
-    public KmerCountValue(byte bitmap, byte count) {
-        set(bitmap, count);
-    }
-
-    public KmerCountValue() {
-        adjBitMap = 0;
-        count = 0;
-    }
-
-    @Override
-    public void readFields(DataInput arg0) throws IOException {
-        adjBitMap = arg0.readByte();
-        count = arg0.readByte();
-    }
-
-    @Override
-    public void write(DataOutput arg0) throws IOException {
-        arg0.writeByte(adjBitMap);
-        arg0.writeByte(count);
-    }
-
-    @Override
-    public String toString() {
-        return GeneCode.getSymbolFromBitMap(adjBitMap) + '\t' + String.valueOf(count);
-    }
-
-    public void set(byte bitmap, byte count) {
-        this.adjBitMap = bitmap;
-        this.count = count;
-    }
-
-    public byte getAdjBitMap() {
-        return adjBitMap;
-    }
-
-    public void setAdjBitMap(byte adjBitMap) {
-        this.adjBitMap = adjBitMap;
-    }
-
-    public byte getCount() {
-        return count;
-    }
-}
\ No newline at end of file
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
new file mode 100644
index 0000000..332b23b
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -0,0 +1,133 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class NodeWritable implements WritableComparable<NodeWritable> {
+    private PositionWritable nodeID;
+    private int countOfKmer;
+    private PositionListWritable incomingList;
+    private PositionListWritable outgoingList;
+    private KmerBytesWritable kmer;
+    
+    public NodeWritable(){
+        nodeID = new PositionWritable();
+        countOfKmer = 0;
+        incomingList = new PositionListWritable();
+        outgoingList = new PositionListWritable();
+        kmer = new KmerBytesWritable();
+    }
+
+    public NodeWritable(int kmerSize) {
+        nodeID = new PositionWritable();
+        countOfKmer = 0;
+        incomingList = new PositionListWritable();
+        outgoingList = new PositionListWritable();
+        kmer = new KmerBytesWritable(kmerSize);
+    }
+
+    public int getCount() {
+        return countOfKmer;
+    }
+
+    public void setCount(int count) {
+        this.countOfKmer = count;
+    }
+
+    public void setNodeID(PositionWritable ref) {
+        this.setNodeID(ref.getReadID(), ref.getPosInRead());
+    }
+
+    public void setNodeID(int readID, byte posInRead) {
+        nodeID.set(readID, posInRead);
+    }
+
+    public void setIncomingList(PositionListWritable incoming) {
+        incomingList.set(incoming);
+    }
+
+    public void setOutgoingList(PositionListWritable outgoing) {
+        outgoingList.set(outgoing);
+    }
+
+    public void reset() {
+        nodeID.set(0, (byte) 0);
+        incomingList.reset();
+        outgoingList.reset();
+        countOfKmer = 0;
+    }
+
+    public PositionListWritable getIncomingList() {
+        return incomingList;
+    }
+
+    public PositionListWritable getOutgoingList() {
+        return outgoingList;
+    }
+
+    public PositionWritable getNodeID() {
+        return nodeID;
+    }
+
+    public KmerBytesWritable getKmer() {
+        return kmer;
+    }
+
+    public void mergeNextWithinOneRead(NodeWritable nextNodeEntry) {
+        this.countOfKmer += 1;
+        this.outgoingList.set(nextNodeEntry.outgoingList);
+        kmer.mergeKmerWithNextCode(nextNodeEntry.kmer.getGeneCodeAtPosition(nextNodeEntry.kmer.getKmerLength() - 1));
+    }
+
+    public void set(NodeWritable node) {
+        this.nodeID.set(node.getNodeID().getReadID(), node.getNodeID().getPosInRead());
+        this.countOfKmer = node.countOfKmer;
+        this.incomingList.set(node.getIncomingList());
+        this.outgoingList.set(node.getOutgoingList());
+        this.kmer.set(node.kmer);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.nodeID.readFields(in);
+        this.countOfKmer = in.readInt();
+        this.incomingList.readFields(in);
+        this.outgoingList.readFields(in);
+        this.kmer.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        this.nodeID.write(out);
+        out.writeInt(this.countOfKmer);
+        this.incomingList.write(out);
+        this.outgoingList.write(out);
+        this.kmer.write(out);
+    }
+
+    @Override
+    public int compareTo(NodeWritable other) {
+        return this.nodeID.compareTo(other.nodeID);
+    }
+
+    @Override
+    public int hashCode() {
+        return nodeID.hashCode();
+    }
+    
+    @Override
+    public String toString(){
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('(');
+        sbuilder.append(nodeID.toString()).append(',');
+        sbuilder.append(countOfKmer).append(',');
+        sbuilder.append(incomingList.toString()).append(',');
+        sbuilder.append(incomingList.toString()).append(',');
+        sbuilder.append(kmer.toString()).append(')');
+        return sbuilder.toString();
+    }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
new file mode 100644
index 0000000..a3b5c84
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -0,0 +1,159 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionListWritable implements Writable, Iterable<PositionWritable> {
+    protected byte[] storage;
+    protected int offset;
+    protected int valueCount;
+    protected static final byte[] EMPTY = {};
+
+    protected PositionWritable posIter = new PositionWritable();
+
+    public PositionListWritable() {
+        this.storage = EMPTY;
+        this.valueCount = 0;
+        this.offset = 0;
+    }
+
+    public PositionListWritable(int count, byte[] data, int offset) {
+        setNewReference(count, data, offset);
+    }
+
+    public void setNewReference(int count, byte[] data, int offset) {
+        this.valueCount = count;
+        this.storage = data;
+        this.offset = offset;
+    }
+
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+    }
+
+    protected int getCapacity() {
+        return storage.length - offset;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap > getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (storage.length - offset > 0) {
+                System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
+            }
+            storage = new_data;
+            offset = 0;
+        }
+    }
+
+    public PositionWritable getPosition(int i) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        posIter.setNewReference(storage, offset + i * PositionWritable.LENGTH);
+        return posIter;
+    }
+
+    @Override
+    public Iterator<PositionWritable> iterator() {
+        Iterator<PositionWritable> it = new Iterator<PositionWritable>() {
+
+            private int currentIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return currentIndex < valueCount;
+            }
+
+            @Override
+            public PositionWritable next() {
+                return getPosition(currentIndex++);
+            }
+
+            @Override
+            public void remove() {
+                // TODO Auto-generated method stub
+            }
+        };
+        return it;
+    }
+
+    public void set(PositionListWritable list2) {
+        set(list2.valueCount, list2.storage, list2.offset);
+    }
+
+    public void set(int valueCount, byte[] newData, int offset) {
+        this.valueCount = valueCount;
+        setSize(valueCount * PositionWritable.LENGTH);
+        if (valueCount > 0) {
+            System.arraycopy(newData, offset, storage, this.offset, valueCount * PositionWritable.LENGTH);
+        }
+    }
+
+    public void reset() {
+        valueCount = 0;
+    }
+
+    public void append(PositionWritable pos) {
+        setSize((1 + valueCount) * PositionWritable.LENGTH);
+        System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+                * PositionWritable.LENGTH, pos.getLength());
+        valueCount += 1;
+    }
+
+    public void append(int readID, byte posInRead) {
+        setSize((1 + valueCount) * PositionWritable.LENGTH);
+        Marshal.putInt(readID, storage, offset + valueCount * PositionWritable.LENGTH);
+        storage[offset + valueCount * PositionWritable.LENGTH + PositionWritable.INTBYTES] = posInRead;
+        valueCount += 1;
+    }
+
+    public int getCountOfPosition() {
+        return valueCount;
+    }
+
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return valueCount * PositionWritable.LENGTH;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.valueCount = in.readInt();
+        setSize(valueCount * PositionWritable.LENGTH);
+        in.readFully(storage, offset, valueCount * PositionWritable.LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(valueCount);
+        out.write(storage, offset, valueCount * PositionWritable.LENGTH);
+    }
+
+    @Override
+    public String toString(){
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('[');
+        for(PositionWritable pos : this){
+            sbuilder.append(pos.toString());
+            sbuilder.append(',');
+        }
+        sbuilder.setCharAt(sbuilder.length()-1, ']');
+        return sbuilder.toString();
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
new file mode 100644
index 0000000..c7f24ec
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -0,0 +1,119 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionWritable implements WritableComparable<PositionWritable> {
+    protected byte[] storage;
+    protected int offset;
+    public static final int LENGTH = 5;
+    public static final int INTBYTES = 4;
+
+    public PositionWritable() {
+        storage = new byte[LENGTH];
+        offset = 0;
+    }
+
+    public PositionWritable(int readID, byte posInRead) {
+        this();
+        set(readID, posInRead);
+    }
+
+    public PositionWritable(byte[] storage, int offset) {
+        setNewReference(storage, offset);
+    }
+    
+    public void setNewReference(byte[] storage, int offset) {
+        this.storage = storage;
+        this.offset = offset;
+    }
+    
+    public void set(int readID, byte posInRead) {
+        Marshal.putInt(readID, storage, offset);
+        storage[offset + INTBYTES] = posInRead;
+    }
+
+    public int getReadID() {
+        return Marshal.getInt(storage, offset);
+    }
+
+    public byte getPosInRead() {
+        return storage[offset + INTBYTES];
+    }
+
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return LENGTH;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        in.readFully(storage, offset, LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.write(storage, offset, LENGTH);
+    }
+    
+    @Override
+    public int hashCode() {
+        return this.getReadID();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PositionWritable))
+            return false;
+        PositionWritable other = (PositionWritable) o;
+        return this.getReadID() == other.getReadID() && this.getPosInRead() == other.getPosInRead();
+    }
+
+    @Override
+    public int compareTo(PositionWritable other) {
+        int diff = this.getReadID() - other.getReadID();
+        if (diff == 0) {
+            return this.getPosInRead() - other.getPosInRead();
+        }
+        return diff;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + Integer.toString(getReadID()) + "," + Integer.toString((int) getPosInRead()) + ")";
+    }
+
+    /** A Comparator optimized for IntWritable. */
+    public static class Comparator extends WritableComparator {
+        public Comparator() {
+            super(PositionWritable.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            int thisValue = Marshal.getInt(b1, s1);
+            int thatValue = Marshal.getInt(b2, s2);
+            int diff = thisValue - thatValue;
+            if (diff == 0) {
+                return b1[s1 + INTBYTES] - b2[s2 + INTBYTES];
+            }
+            return diff;
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(PositionWritable.class, new Comparator());
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
deleted file mode 100644
index abedad6..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.type;
-
-public class VKmerBytesWritable extends KmerBytesWritable {
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-
-    @Deprecated
-    public VKmerBytesWritable() {
-        super();
-    }
-
-    public VKmerBytesWritable(int k, byte[] storage) {
-        super(k, storage);
-    }
-
-    public VKmerBytesWritable(int k) {
-        super(k);
-    }
-
-    public VKmerBytesWritable(KmerBytesWritable other) {
-        super(other);
-    }
-
-    protected void setSize(int size) {
-        if (size > getCapacity()) {
-            setCapacity((size * 3 / 2));
-        }
-        this.size = size;
-    }
-
-    protected int getCapacity() {
-        return bytes.length;
-    }
-
-    protected void setCapacity(int new_cap) {
-        if (new_cap != getCapacity()) {
-            byte[] new_data = new byte[new_cap];
-            if (new_cap < size) {
-                size = new_cap;
-            }
-            if (size != 0) {
-                System.arraycopy(bytes, 0, new_data, 0, size);
-            }
-            bytes = new_data;
-        }
-    }
-
-    /**
-     * Read Kmer from read text into bytes array e.g. AATAG will compress as
-     * [0x000G, 0xATAA]
-     * 
-     * @param k
-     * @param array
-     * @param start
-     */
-    public void setByRead(int k, byte[] array, int start) {
-        reset(k);
-        super.setByRead(array, start);
-    }
-
-    /**
-     * Compress Reversed Kmer into bytes array AATAG will compress as
-     * [0x000A,0xATAG]
-     * 
-     * @param input
-     *            array
-     * @param start
-     *            position
-     */
-    public void setByReadReverse(int k, byte[] array, int start) {
-        reset(k);
-        super.setByReadReverse(array, start);
-    }
-
-    @Override
-    public void set(KmerBytesWritable newData) {
-        if (newData == null){
-            this.set(0,null,0,0);
-        }else{
-            this.set(newData.kmerlength, newData.bytes, 0, newData.size);
-        }
-    }
-
-    public void set(int k, byte[] newData, int offset, int length) {
-        reset(k);
-        if (k > 0 ){
-            System.arraycopy(newData, offset, bytes, 0, size);
-        }
-    }
-
-    /**
-     * Reset array by kmerlength
-     * 
-     * @param k
-     */
-    public void reset(int k) {
-        this.kmerlength = k;
-        setSize(0);
-        setSize(KmerUtil.getByteNumFromK(k));
-        clearLeadBit();
-    }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
deleted file mode 100644
index 7aa05f5..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.type.old;
-
-@Deprecated
-public class Kmer {
-
-    public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
-
-    public final static class GENE_CODE {
-
-        /**
-         * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
-         */
-        public static final byte A = 0;
-        public static final byte C = 1;
-        public static final byte G = 2;
-        public static final byte T = 3;
-
-        public static byte getCodeFromSymbol(byte ch) {
-            byte r = 0;
-            switch (ch) {
-                case 'A':
-                case 'a':
-                    r = A;
-                    break;
-                case 'C':
-                case 'c':
-                    r = C;
-                    break;
-                case 'G':
-                case 'g':
-                    r = G;
-                    break;
-                case 'T':
-                case 't':
-                    r = T;
-                    break;
-            }
-            return r;
-        }
-
-        public static byte getSymbolFromCode(byte code) {
-            if (code > 3) {
-                return '!';
-            }
-            return GENE_SYMBOL[code];
-        }
-
-        public static byte getAdjBit(byte t) {
-            byte r = 0;
-            switch (t) {
-                case 'A':
-                case 'a':
-                    r = 1 << A;
-                    break;
-                case 'C':
-                case 'c':
-                    r = 1 << C;
-                    break;
-                case 'G':
-                case 'g':
-                    r = 1 << G;
-                    break;
-                case 'T':
-                case 't':
-                    r = 1 << T;
-                    break;
-            }
-            return r;
-        }
-
-        /**
-         * It works for path merge.
-         * Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
-         * 
-         * @param t
-         *            the neighbor code in BitMap
-         * @return the genecode
-         */
-        public static byte getGeneCodeFromBitMap(byte t) {
-            switch (t) {
-                case 1 << A:
-                    return A;
-                case 1 << C:
-                    return C;
-                case 1 << G:
-                    return G;
-                case 1 << T:
-                    return T;
-            }
-            return -1;
-        }
-
-        public static byte mergePreNextAdj(byte pre, byte next) {
-            return (byte) (pre << 4 | (next & 0x0f));
-        }
-
-        public static String getSymbolFromBitMap(byte code) {
-            int left = (code >> 4) & 0x0F;
-            int right = code & 0x0F;
-            StringBuilder str = new StringBuilder();
-            for (int i = A; i <= T; i++) {
-                if ((left & (1 << i)) != 0) {
-                    str.append((char) GENE_SYMBOL[i]);
-                }
-            }
-            str.append('|');
-            for (int i = A; i <= T; i++) {
-                if ((right & (1 << i)) != 0) {
-                    str.append((char) GENE_SYMBOL[i]);
-                }
-            }
-            return str.toString();
-        }
-    }
-
-    public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
-        StringBuilder strKmer = new StringBuilder();
-        int byteId = keyStart + keyLength - 1;
-        byte currentbyte = keyData[byteId];
-        for (int geneCount = 0; geneCount < k; geneCount++) {
-            if (geneCount % 4 == 0 && geneCount > 0) {
-                currentbyte = keyData[--byteId];
-            }
-            strKmer.append((char) GENE_SYMBOL[(currentbyte >> ((geneCount % 4) * 2)) & 0x03]);
-        }
-        return strKmer.toString();
-    }
-
-    public static int getByteNumFromK(int k) {
-        int x = k / 4;
-        if (k % 4 != 0) {
-            x += 1;
-        }
-        return x;
-    }
-
-    /**
-     * Compress Kmer into bytes array AATAG will compress as [0x000G, 0xATAA]
-     * 
-     * @param kmer
-     * @param input
-     *            array
-     * @param start
-     *            position
-     * @return initialed kmer array
-     */
-    public static byte[] compressKmer(int k, byte[] array, int start) {
-        final int byteNum = getByteNumFromK(k);
-        byte[] bytes = new byte[byteNum];
-
-        byte l = 0;
-        int bytecount = 0;
-        int bcount = byteNum - 1;
-        for (int i = start; i < start + k; i++) {
-            byte code = GENE_CODE.getCodeFromSymbol(array[i]);
-            l |= (byte) (code << bytecount);
-            bytecount += 2;
-            if (bytecount == 8) {
-                bytes[bcount--] = l;
-                l = 0;
-                bytecount = 0;
-            }
-        }
-        if (bcount >= 0) {
-            bytes[0] = l;
-        }
-        return bytes;
-    }
-
-    /**
-     * Shift Kmer to accept new input
-     * 
-     * @param kmer
-     * @param bytes
-     *            Kmer Array
-     * @param c
-     *            Input new gene character
-     * @return the shiftout gene, in gene code format
-     */
-    public static byte moveKmer(int k, byte[] kmer, byte c) {
-        int byteNum = kmer.length;
-        byte output = (byte) (kmer[byteNum - 1] & 0x03);
-        for (int i = byteNum - 1; i > 0; i--) {
-            byte in = (byte) (kmer[i - 1] & 0x03);
-            kmer[i] = (byte) (((kmer[i] >>> 2) & 0x3f) | (in << 6));
-        }
-        int pos = ((k - 1) % 4) << 1;
-        byte code = (byte) (GENE_CODE.getCodeFromSymbol(c) << pos);
-        kmer[0] = (byte) (((kmer[0] >>> 2) & 0x3f) | code);
-        return (byte) (1 << output);
-    }
-
-    public static byte reverseKmerByte(byte k) {
-        int x = (((k >> 2) & 0x33) | ((k << 2) & 0xcc));
-        return (byte) (((x >> 4) & 0x0f) | ((x << 4) & 0xf0));
-    }
-
-    public static byte[] reverseKmer(int k, byte[] kmer) {
-        byte[] reverseKmer = new byte[kmer.length];
-
-        int curPosAtKmer = ((k - 1) % 4) << 1;
-        int curByteAtKmer = 0;
-
-        int curPosAtReverse = 0;
-        int curByteAtReverse = reverseKmer.length - 1;
-        reverseKmer[curByteAtReverse] = 0;
-        for (int i = 0; i < k; i++) {
-            byte gene = (byte) ((kmer[curByteAtKmer] >> curPosAtKmer) & 0x03);
-            reverseKmer[curByteAtReverse] |= gene << curPosAtReverse;
-            curPosAtReverse += 2;
-            if (curPosAtReverse >= 8) {
-                curPosAtReverse = 0;
-                reverseKmer[--curByteAtReverse] = 0;
-            }
-            curPosAtKmer -= 2;
-            if (curPosAtKmer < 0) {
-                curPosAtKmer = 6;
-                curByteAtKmer++;
-            }
-        }
-
-        return reverseKmer;
-    }
-
-    /**
-     * Compress Reversed Kmer into bytes array AATAG will compress as
-     * [0x000A,0xATAG]
-     * 
-     * @param kmer
-     * @param input
-     *            array
-     * @param start
-     *            position
-     * @return initialed kmer array
-     */
-    public static byte[] compressKmerReverse(int k, byte[] array, int start) {
-        final int byteNum = getByteNumFromK(k);
-        byte[] bytes = new byte[byteNum];
-
-        byte l = 0;
-        int bytecount = 0;
-        int bcount = byteNum - 1;
-        for (int i = start + k - 1; i >= 0; i--) {
-            byte code = GENE_CODE.getCodeFromSymbol(array[i]);
-            l |= (byte) (code << bytecount);
-            bytecount += 2;
-            if (bytecount == 8) {
-                bytes[bcount--] = l;
-                l = 0;
-                bytecount = 0;
-            }
-        }
-        if (bcount >= 0) {
-            bytes[0] = l;
-        }
-        return bytes;
-    }
-
-    /**
-     * Shift Kmer to accept new input
-     * 
-     * @param kmer
-     * @param bytes
-     *            Kmer Array
-     * @param c
-     *            Input new gene character
-     * @return the shiftout gene, in gene code format
-     */
-    public static byte moveKmerReverse(int k, byte[] kmer, byte c) {
-        int pos = ((k - 1) % 4) << 1;
-        byte output = (byte) ((kmer[0] >> pos) & 0x03);
-        for (int i = 0; i < kmer.length - 1; i++) {
-            byte in = (byte) ((kmer[i + 1] >> 6) & 0x03);
-            kmer[i] = (byte) ((kmer[i] << 2) | in);
-        }
-        // (k%4) * 2
-        if (k % 4 != 0) {
-            kmer[0] &= (1 << ((k % 4) << 1)) - 1;
-        }
-        kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
-        return (byte) (1 << output);
-    }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
deleted file mode 100644
index 7a96d2f..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-@Deprecated
-public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
-    private static final int LENGTH_BYTES = 4;
-    private static final byte[] EMPTY_BYTES = {};
-    private byte size;
-    private byte[] bytes;
-
-    public KmerBytesWritable() {
-        this(EMPTY_BYTES);
-    }
-
-    public KmerBytesWritable(byte[] bytes) {
-        this.bytes = bytes;
-        this.size = (byte) bytes.length;
-    }
-
-    @Override
-    public byte[] getBytes() {
-        return bytes;
-    }
-
-    @Deprecated
-    public byte[] get() {
-        return getBytes();
-    }
-
-    @Override
-    public int getLength() {
-        return (int) size;
-    }
-
-    @Deprecated
-    public int getSize() {
-        return getLength();
-    }
-
-    public void setSize(byte size) {
-        if ((int) size > getCapacity()) {
-            setCapacity((byte) (size * 3 / 2));
-        }
-        this.size = size;
-    }
-
-    public int getCapacity() {
-        return bytes.length;
-    }
-
-    public void setCapacity(byte new_cap) {
-        if (new_cap != getCapacity()) {
-            byte[] new_data = new byte[new_cap];
-            if (new_cap < size) {
-                size = new_cap;
-            }
-            if (size != 0) {
-                System.arraycopy(bytes, 0, new_data, 0, size);
-            }
-            bytes = new_data;
-        }
-    }
-
-    public void set(KmerBytesWritable newData) {
-        set(newData.bytes, (byte) 0, newData.size);
-    }
-
-    public void set(byte[] newData, byte offset, byte length) {
-        setSize((byte) 0);
-        setSize(length);
-        System.arraycopy(newData, offset, bytes, 0, size);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        setSize((byte) 0); // clear the old data
-        setSize(in.readByte());
-        in.readFully(bytes, 0, size);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeByte(size);
-        out.write(bytes, 0, size);
-    }
-
-    @Override
-    public int hashCode() {
-        return super.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object right_obj) {
-        if (right_obj instanceof KmerBytesWritable)
-            return super.equals(right_obj);
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        StringBuffer sb = new StringBuffer(3 * size);
-        for (int idx = 0; idx < (int) size; idx++) {
-            // if not the first, put a blank separator in
-            if (idx != 0) {
-                sb.append(' ');
-            }
-            String num = Integer.toHexString(0xff & bytes[idx]);
-            // if it is only one digit, add a leading 0.
-            if (num.length() < 2) {
-                sb.append('0');
-            }
-            sb.append(num);
-        }
-        return sb.toString();
-    }
-
-    public static class Comparator extends WritableComparator {
-        public Comparator() {
-            super(KmerBytesWritable.class);
-        }
-
-        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
-        }
-    }
-
-    static { // register this comparator
-        WritableComparator.define(KmerBytesWritable.class, new Comparator());
-    }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
deleted file mode 100644
index a4b1bec..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.util.Arrays;
-
-@Deprecated
-public class KmerUtil {
-
-    public static int countNumberOfBitSet(int i) {
-        int c = 0;
-        for (; i != 0; c++) {
-            i &= i - 1;
-        }
-        return c;
-    }
-
-    public static int inDegree(byte bitmap) {
-        return countNumberOfBitSet((bitmap >> 4) & 0x0f);
-    }
-
-    public static int outDegree(byte bitmap) {
-        return countNumberOfBitSet(bitmap & 0x0f);
-    }
-
-    /**
-     * Get last kmer from kmer-chain.
-     * e.g. kmerChain is AAGCTA, if k =5, it will
-     * return AGCTA
-     * 
-     * @param k
-     * @param kInChain
-     * @param kmerChain
-     * @return LastKmer bytes array
-     */
-    public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
-        if (k > kInChain) {
-            return null;
-        }
-        if (k == kInChain) {
-            return kmerChain.clone();
-        }
-        int byteNum = Kmer.getByteNumFromK(k);
-        byte[] kmer = new byte[byteNum];
-
-        /** from end to start */
-        int byteInChain = length - 1 - (kInChain - k) / 4;
-        int posInByteOfChain = ((kInChain - k) % 4) << 1; // *2
-        int byteInKmer = byteNum - 1;
-        for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
-            kmer[byteInKmer] = (byte) ((0xff & kmerChain[offset + byteInChain]) >> posInByteOfChain);
-            kmer[byteInKmer] |= ((kmerChain[offset + byteInChain - 1] << (8 - posInByteOfChain)));
-        }
-
-        /** last kmer byte */
-        if (byteInKmer == 0) {
-            kmer[0] = (byte) ((kmerChain[offset] & 0xff) >> posInByteOfChain);
-        }
-        return kmer;
-    }
-
-    /**
-     * Get first kmer from kmer-chain e.g. kmerChain is AAGCTA, if k=5, it will
-     * return AAGCT
-     * 
-     * @param k
-     * @param kInChain
-     * @param kmerChain
-     * @return FirstKmer bytes array
-     */
-    public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
-        if (k > kInChain) {
-            return null;
-        }
-        if (k == kInChain) {
-            return kmerChain.clone();
-        }
-        int byteNum = Kmer.getByteNumFromK(k);
-        byte[] kmer = new byte[byteNum];
-
-        int i = 1;
-        for (; i < kmer.length; i++) {
-            kmer[kmer.length - i] = kmerChain[offset + length - i];
-        }
-        int posInByteOfChain = (k % 4) << 1; // *2
-        if (posInByteOfChain == 0) {
-            kmer[0] = kmerChain[offset + length - i];
-        } else {
-            kmer[0] = (byte) (kmerChain[offset + length - i] & ((1 << posInByteOfChain) - 1));
-        }
-        return kmer;
-    }
-
-    /**
-     * Merge kmer with next neighbor in gene-code format.
-     * The k of new kmer will increase by 1
-     * e.g. AAGCT merge with A => AAGCTA
-     * 
-     * @param k
-     *            :input k of kmer
-     * @param kmer
-     *            : input bytes of kmer
-     * @param nextCode
-     *            : next neighbor in gene-code format
-     * @return the merged Kmer, this K of this Kmer is k+1
-     */
-    public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
-        int byteNum = length;
-        if (k % 4 == 0) {
-            byteNum++;
-        }
-        byte[] mergedKmer = new byte[byteNum];
-        for (int i = 1; i <= length; i++) {
-            mergedKmer[mergedKmer.length - i] = kmer[offset + length - i];
-        }
-        if (mergedKmer.length > length) {
-            mergedKmer[0] = (byte) (nextCode & 0x3);
-        } else {
-            mergedKmer[0] = (byte) (kmer[offset] | ((nextCode & 0x3) << ((k % 4) << 1)));
-        }
-        return mergedKmer;
-    }
-
-    /**
-     * Merge kmer with previous neighbor in gene-code format.
-     * The k of new kmer will increase by 1
-     * e.g. AAGCT merge with A => AAAGCT
-     * 
-     * @param k
-     *            :input k of kmer
-     * @param kmer
-     *            : input bytes of kmer
-     * @param preCode
-     *            : next neighbor in gene-code format
-     * @return the merged Kmer,this K of this Kmer is k+1
-     */
-    public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
-        int byteNum = length;
-        byte[] mergedKmer = null;
-        int byteInMergedKmer = 0;
-        if (k % 4 == 0) {
-            byteNum++;
-            mergedKmer = new byte[byteNum];
-            mergedKmer[0] = (byte) ((kmer[offset] >> 6) & 0x3);
-            byteInMergedKmer++;
-        } else {
-            mergedKmer = new byte[byteNum];
-        }
-        for (int i = 0; i < length - 1; i++, byteInMergedKmer++) {
-            mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + i] << 2) | ((kmer[offset + i + 1] >> 6) & 0x3));
-        }
-        mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + length - 1] << 2) | (preCode & 0x3));
-        return mergedKmer;
-    }
-
-    /**
-     * Merge two kmer to one kmer
-     * e.g. ACTA + ACCGT => ACTAACCGT
-     * 
-     * @param preK
-     *            : previous k of kmer
-     * @param kmerPre
-     *            : bytes array of previous kmer
-     * @param nextK
-     *            : next k of kmer
-     * @param kmerNext
-     *            : bytes array of next kmer
-     * @return merged kmer, the new k is @preK + @nextK
-     */
-    public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
-            byte[] kmerNext, int offsetNext, int lengthNext) {
-        int byteNum = Kmer.getByteNumFromK(preK + nextK);
-        byte[] mergedKmer = new byte[byteNum];
-        int i = 1;
-        for (; i <= lengthPre; i++) {
-            mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
-        }
-        if (i > 1) {
-            i--;
-        }
-        if (preK % 4 == 0) {
-            for (int j = 1; j <= lengthNext; j++) {
-                mergedKmer[byteNum - i - j] = kmerNext[offsetNext + lengthNext - j];
-            }
-        } else {
-            int posNeedToMove = ((preK % 4) << 1);
-            mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
-            for (int j = 1; j < lengthNext; j++) {
-                mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
-                        + lengthNext - j - 1] << posNeedToMove));
-            }
-            if ((nextK % 4) * 2 + posNeedToMove > 8) {
-                mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
-            }
-        }
-        return mergedKmer;
-    }
-
-    /**
-     * Safely shifted the kmer forward without change the input kmer
-     * e.g. AGCGC shift with T => GCGCT
-     * 
-     * @param k
-     *            : kmer length
-     * @param kmer
-     *            : input kmer
-     * @param afterCode
-     *            : input genecode
-     * @return new created kmer that shifted by afterCode, the K will not change
-     */
-    public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
-        Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
-        return shifted;
-    }
-
-    /**
-     * Safely shifted the kmer backward without change the input kmer
-     * e.g. AGCGC shift with T => TAGCG
-     * 
-     * @param k
-     *            : kmer length
-     * @param kmer
-     *            : input kmer
-     * @param preCode
-     *            : input genecode
-     * @return new created kmer that shifted by preCode, the K will not change
-     */
-    public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
-        Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
-        return shifted;
-    }
-
-    public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
-        if (pos >= k) {
-            return -1;
-        }
-        int posByte = pos / 4;
-        int shift = (pos % 4) << 1;
-        return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
-    }
-}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
similarity index 87%
rename from genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
rename to genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
index 7c4c675..1eb58c8 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
@@ -13,29 +13,19 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.example.kmer;
+package edu.uci.ics.genomix.data.test;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import edu.uci.ics.genomix.type.GeneCode;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
 
-public class VKmerBytesWritableFactoryTest {
+public class KmerBytesWritableFactoryTest {
     static byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
 
-    VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(8);
-
-    @Test
-    public void TestDegree() {
-        Assert.assertTrue(GeneCode.inDegree((byte) 0xff) == 4);
-        Assert.assertTrue(GeneCode.outDegree((byte) 0xff) == 4);
-        Assert.assertTrue(GeneCode.inDegree((byte) 0x3f) == 2);
-        Assert.assertTrue(GeneCode.outDegree((byte) 0x01) == 1);
-        Assert.assertTrue(GeneCode.inDegree((byte) 0x01) == 0);
-    }
+    KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(8);
 
     @Test
     public void TestGetLastKmer() {
@@ -49,7 +39,7 @@
             lastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
         }
-        VKmerBytesWritable vlastKmer;
+        KmerBytesWritable vlastKmer;
         for (int i = 8; i > 0; i--) {
             vlastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
@@ -70,7 +60,7 @@
             firstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
         }
-        VKmerBytesWritable vfirstKmer;
+        KmerBytesWritable vfirstKmer;
         for (int i = 8; i > 0; i--) {
             vfirstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
@@ -84,7 +74,7 @@
         KmerBytesWritable kmer = new KmerBytesWritable(9);
         kmer.setByRead(array, 0);
         Assert.assertEquals("AGCTGACCG", kmer.toString());
-        VKmerBytesWritable subKmer;
+        KmerBytesWritable subKmer;
         for (int istart = 0; istart < kmer.getKmerLength() - 1; istart++) {
             for (int isize = 1; isize + istart <= kmer.getKmerLength(); isize++) {
                 subKmer = kmerFactory.getSubKmerFromChain(istart, isize, kmer);
@@ -168,7 +158,7 @@
         KmerBytesWritable kmer5 = new KmerBytesWritable(7);
         kmer5.setByRead(array, 0);
         String text5 = "AGCTGAC";
-        VKmerBytesWritable kmer6 = new VKmerBytesWritable(9);
+        KmerBytesWritable kmer6 = new KmerBytesWritable(9);
         kmer6.setByRead(9, array, 1);
         String text6 = "GCTGACCGT";
         merged = kmerFactory.mergeTwoKmer(kmer5, kmer6);
@@ -188,14 +178,14 @@
 
     @Test
     public void TestShift() {
-        VKmerBytesWritable kmer = new VKmerBytesWritable(kmerFactory.getKmerByRead(9, array, 0));
+        KmerBytesWritable kmer = new KmerBytesWritable(kmerFactory.getKmerByRead(9, array, 0));
         String text = "AGCTGACCG";
         Assert.assertEquals(text, kmer.toString());
 
-        VKmerBytesWritable kmerForward = kmerFactory.shiftKmerWithNextCode(kmer, GeneCode.A);
+        KmerBytesWritable kmerForward = kmerFactory.shiftKmerWithNextCode(kmer, GeneCode.A);
         Assert.assertEquals(text, kmer.toString());
         Assert.assertEquals("GCTGACCGA", kmerForward.toString());
-        VKmerBytesWritable kmerBackward = kmerFactory.shiftKmerWithPreCode(kmer, GeneCode.C);
+        KmerBytesWritable kmerBackward = kmerFactory.shiftKmerWithPreCode(kmer, GeneCode.C);
         Assert.assertEquals(text, kmer.toString());
         Assert.assertEquals("CAGCTGACC", kmerBackward.toString());
 
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
similarity index 82%
rename from genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
rename to genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index faee509..b7dd8f1 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.example.kmer;
+package edu.uci.ics.genomix.data.test;
 
 import junit.framework.Assert;
 
@@ -90,4 +90,21 @@
         }
     }
 
+    @Test
+    public void TestMergeNext() {
+        KmerBytesWritable kmer = new KmerBytesWritable(9);
+        byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G' };
+        kmer.setByRead(array, 0);
+        Assert.assertEquals("AGCTGACCG", kmer.toString());
+
+        String text = "AGCTGACCG";
+        for (int i = 0; i < 10; i++) {
+            for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+                kmer.mergeKmerWithNextCode(x);
+                text = text + (char) GeneCode.GENE_SYMBOL[x];
+                Assert.assertEquals(text, kmer.toString());
+            }
+        }
+    }
+
 }
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionListWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionListWritableTest.java
new file mode 100644
index 0000000..19f2f39
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionListWritableTest.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.genomix.data.test;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class PositionListWritableTest {
+
+    @Test
+    public void TestInitial() {
+        PositionListWritable plist = new PositionListWritable();
+        Assert.assertEquals(plist.getCountOfPosition(), 0);
+
+        for (int i = 0; i < 200; i++) {
+            plist.append(i, (byte) i);
+            Assert.assertEquals(i, plist.getPosition(i).getReadID());
+            Assert.assertEquals((byte) i, plist.getPosition(i).getPosInRead());
+            Assert.assertEquals(i + 1, plist.getCountOfPosition());
+        }
+        int i = 0;
+        for (PositionWritable pos : plist) {
+            Assert.assertEquals(i, pos.getReadID());
+            Assert.assertEquals((byte) i, pos.getPosInRead());
+            i++;
+        }
+        
+        byte [] another = new byte [plist.getLength()*2];
+        int start = 20;
+        System.arraycopy(plist.getByteArray(), 0, another, start, plist.getLength());
+        PositionListWritable plist2 = new PositionListWritable(plist.getCountOfPosition(),another,start);
+        for( i = 0; i < plist2.getCountOfPosition(); i++){
+            Assert.assertEquals(plist.getPosition(i), plist2.getPosition(i));
+        }
+    }
+    
+}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
new file mode 100644
index 0000000..13c6c0d
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.genomix.data.test;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class PositionWritableTest {
+
+    @Test
+    public void TestInitial() {
+        PositionWritable pos = new PositionWritable();
+        pos = new PositionWritable(3, (byte) 1);
+        Assert.assertEquals(pos.getReadID(), 3);
+        Assert.assertEquals(pos.getPosInRead(), 1);
+
+        byte[] start = new byte[256];
+        for (int i = 0; i < 128; i++) {
+            Marshal.putInt(i, start, i);
+            start[i + PositionWritable.INTBYTES] = (byte) (i / 2);
+            pos = new PositionWritable(start, i);
+            Assert.assertEquals(pos.getReadID(), i);
+            Assert.assertEquals(pos.getPosInRead(), (byte) (i / 2));
+            pos.set(-i, (byte) (i / 4));
+            Assert.assertEquals(pos.getReadID(), -i);
+            Assert.assertEquals(pos.getPosInRead(), (byte) (i / 4));
+            pos.setNewReference(start, i);
+            Assert.assertEquals(pos.getReadID(), -i);
+            Assert.assertEquals(pos.getPosInRead(), (byte) (i / 4));
+
+        }
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
new file mode 100644
index 0000000..591e3c7
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingMapper extends MapReduceBase implements
+        Mapper<KmerBytesWritable, PositionListWritable, IntWritable, LineBasedmappingWritable> {
+    IntWritable  numLine = new IntWritable();
+    LineBasedmappingWritable lineBasedWriter = new LineBasedmappingWritable();
+    @Override
+    public void map(KmerBytesWritable key, PositionListWritable value, OutputCollector<IntWritable, LineBasedmappingWritable> output,
+            Reporter reporter) throws IOException {
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
new file mode 100644
index 0000000..4b971a7
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -0,0 +1,86 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingReducer extends MapReduceBase implements
+        Reducer<IntWritable, LineBasedmappingWritable, NodeWritable, NullWritable> {
+
+/*    public ArrayList<LineBasedmappingWritable> lineElementsSet = new ArrayList<LineBasedmappingWritable>();
+    public Position outputVerID = new Position();
+    public VertexAdjacentWritable outputAdjacentList = new VertexAdjacentWritable();
+    public PositionList srcVtexAdjList = new PositionList();
+    public PositionList desVtexAdjList = new PositionList();
+    public VertexIDListWritable srcAdjListWritable = new VertexIDListWritable();
+    public VKmerBytesWritable desKmer = new VKmerBytesWritable(1);
+    public VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+    public VKmerBytesWritable srcKmer = new VKmerBytesWritable(1);*/
+    @Override
+    public void reduce(IntWritable key, Iterator<LineBasedmappingWritable> values,
+            OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
+/*        while (values.hasNext()) {
+            lineElementsSet.add(values.next());
+        }
+        int[] orderLineTable = new int[lineElementsSet.size()];
+        for (int i = 0; i < lineElementsSet.size(); i++) {
+            int posInInvertedIndex = lineElementsSet.get(i).getPosInInvertedIndex();
+            orderLineTable[lineElementsSet.get(i).getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex)] = i;
+        }
+        //the first node in this read
+        int posInInvertedIndex = lineElementsSet.get(orderLineTable[0]).getPosInInvertedIndex();
+        outputVerID.set(
+                lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
+                (byte) 0);
+        desVtexAdjList.set(lineElementsSet.get(orderLineTable[1]).getAdjVertexList().get());
+        for (int i = 0; i < desVtexAdjList.getUsedSize(); i++) {
+            if (desVtexAdjList.getPosinReadListElement(i) == (byte) 0) {
+                srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
+            }
+        }
+        srcVtexAdjList.addELementToList(key.get(), (byte) 1);
+        outputVerID.set(
+                lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
+                (byte) 0);
+        srcAdjListWritable.set(srcVtexAdjList);
+        outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[0]).getVkmer());
+        output.collect(outputVerID, outputAdjacentList);
+        //srcVtexAdjList reset!!!!
+
+        for (int i = 1; i < lineElementsSet.size(); i++) {
+            desVtexAdjList.set(lineElementsSet.get(orderLineTable[i + 1]).getAdjVertexList().get());
+            boolean flag = false;
+            for (int j = 0; j < desVtexAdjList.getUsedSize(); j++) {
+                if (desVtexAdjList.getPosinReadListElement(j) == (byte) 0) {
+                    srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
+                    flag = true;
+                }
+            }
+            if (flag = true) {
+                //doesm't merge
+                srcVtexAdjList.addELementToList(key.get(), (byte) (i + 1));
+                outputVerID.set(
+                        lineElementsSet.get(orderLineTable[i]).getAdjVertexList().get()
+                                .getReadListElement(posInInvertedIndex), lineElementsSet.get(orderLineTable[i])
+                                .getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex));
+                srcAdjListWritable.set(srcVtexAdjList);
+                outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[i]).getVkmer());
+            }
+            else {
+                //merge
+                desKmer.set(kmerFactory.getFirstKmerFromChain(1, lineElementsSet.get(orderLineTable[i+1]).getVkmer()));
+                srcKmer.set(lineElementsSet.get(orderLineTable[i]).getVkmer());
+                lineElementsSet.get(orderLineTable[i+1]).getVkmer().set(kmerFactory.mergeTwoKmer(srcKmer, desKmer));
+                orderLineTable[i+1] = orderLineTable[i];
+            }
+        }*/
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
new file mode 100644
index 0000000..d3c2ff4
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class GraphInvertedIndexBuildingMapper extends MapReduceBase implements
+        Mapper<LongWritable, Text, KmerBytesWritable, PositionWritable> {
+    
+    public static int KMER_SIZE;
+    public PositionWritable outputVertexID;
+    public KmerBytesWritable outputKmer;
+
+    @Override
+    public void configure(JobConf job) {
+        KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+        outputVertexID = new PositionWritable();
+        outputKmer = new KmerBytesWritable(KMER_SIZE);
+    }
+    @Override
+    public void map(LongWritable key, Text value, OutputCollector<KmerBytesWritable, PositionWritable> output,
+            Reporter reporter) throws IOException {
+        String geneLine = value.toString();
+        /** first kmer */
+        byte[] array = geneLine.getBytes();
+        outputKmer.setByRead(array, 0);
+        outputVertexID.set((int)key.get(), (byte)0);
+        output.collect(outputKmer, outputVertexID);
+        /** middle kmer */
+        for (int i = KMER_SIZE; i < array.length - 1; i++) {
+            GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[i]));
+            outputVertexID.set((int)key.get(), (byte)(i - KMER_SIZE + 1));
+            output.collect(outputKmer, outputVertexID);
+        }
+        /** last kmer */
+        GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[array.length - 1]));
+        outputVertexID.set((int)key.get(), (byte)(array.length - 1 + 1));
+        output.collect(outputKmer, outputVertexID);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
new file mode 100644
index 0000000..72827f2
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings({ "deprecation", "unused" })
+public class GraphInvertedIndexBuildingReducer extends MapReduceBase implements
+        Reducer<KmerBytesWritable, PositionWritable, KmerBytesWritable, PositionListWritable> {
+    PositionListWritable outputlist = new PositionListWritable();
+    @Override
+    public void reduce(KmerBytesWritable key, Iterator<PositionWritable> values,
+            OutputCollector<KmerBytesWritable, PositionListWritable> output, Reporter reporter) throws IOException {
+        while (values.hasNext()) {
+            outputlist.append(values.next());
+        }
+        output.collect(key, outputlist);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
new file mode 100644
index 0000000..1e44903
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class LineBasedmappingWritable extends PositionListWritable{
+    byte posInRead;
+    
+    public LineBasedmappingWritable() {
+        super();
+        this.posInRead = -1;        
+    }
+
+    public LineBasedmappingWritable(int count, byte [] data, int offset, byte posInRead) {       
+        super(count, data, offset);
+        this.posInRead = posInRead;
+    }
+    
+    public void set(byte posInRead, PositionListWritable right) {
+        super.set(right);
+        this.posInRead = posInRead;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        this.posInRead = in.readByte();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        out.writeByte(this.posInRead);
+    }
+    
+    public int getPosInInvertedIndex() {
+        return this.posInRead;
+    }
+}
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index ca5cb61..ebc49f2 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -261,5 +261,6 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		
 	</dependencies>
 </project>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
deleted file mode 100644
index ea70fb0..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.dataflow.aggregators;

-

-import java.io.DataOutput;

-import java.io.IOException;

-

-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;

-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

-

-/**

- * sum

- */

-public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {

-    private static final long serialVersionUID = 1L;

-

-    public DistributedMergeLmerAggregateFactory() {

-    }

-

-    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {

-        private static final int MAX = 127;

-

-        @Override

-        public void reset() {

-        }

-

-        @Override

-        public void close() {

-            // TODO Auto-generated method stub

-

-        }

-

-        @Override

-        public AggregateState createAggregateStates() {

-            return new AggregateState(new Object() {

-            });

-        }

-

-        protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {

-            int tupleOffset = accessor.getTupleStartOffset(tIndex);

-            int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

-            int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

-            byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

-            return data;

-        }

-

-        @Override

-        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-                throws HyracksDataException {

-            byte bitmap = getField(accessor, tIndex, 1);

-            byte count = getField(accessor, tIndex, 2);

-

-            DataOutput fieldOutput = tupleBuilder.getDataOutput();

-            try {

-                fieldOutput.writeByte(bitmap);

-                tupleBuilder.addFieldEndOffset();

-                fieldOutput.writeByte(count);

-                tupleBuilder.addFieldEndOffset();

-            } catch (IOException e) {

-                throw new HyracksDataException("I/O exception when initializing the aggregator.");

-            }

-        }

-

-        @Override

-        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

-                int stateTupleIndex, AggregateState state) throws HyracksDataException {

-            byte bitmap = getField(accessor, tIndex, 1);

-            short count = getField(accessor, tIndex, 2);

-

-            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

-            int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

-            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);

-            int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;

-            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;

-

-            byte[] data = stateAccessor.getBuffer().array();

-

-            bitmap |= data[bitoffset];

-            count += data[countoffset];

-            if (count >= MAX) {

-                count = (byte) MAX;

-            }

-            data[bitoffset] = bitmap;

-            data[countoffset] = (byte) count;

-        }

-

-        @Override

-        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                AggregateState state) throws HyracksDataException {

-            byte bitmap = getField(accessor, tIndex, 1);

-            byte count = getField(accessor, tIndex, 2);

-            DataOutput fieldOutput = tupleBuilder.getDataOutput();

-            try {

-                fieldOutput.writeByte(bitmap);

-                tupleBuilder.addFieldEndOffset();

-                fieldOutput.writeByte(count);

-                tupleBuilder.addFieldEndOffset();

-            } catch (IOException e) {

-                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

-            }

-

-        }

-

-        @Override

-        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                AggregateState state) throws HyracksDataException {

-            outputPartialResult(tupleBuilder, accessor, tIndex, state);

-        }

-

-    }

-

-    @Override

-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

-            throws HyracksDataException {

-        return new DistributeAggregatorDescriptor();

-    }

-

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
deleted file mode 100644
index 87c0207..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
-    private static final int MAX = 127;
-
-    @Override
-    public void reset() {
-    }
-
-    @Override
-    public void close() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public AggregateState createAggregateStates() {
-        return new AggregateState(new Object() {
-        });
-    }
-
-    protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
-        int tupleOffset = accessor.getTupleStartOffset(tIndex);
-        int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
-        int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
-        byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
-        return data;
-    }
-
-    @Override
-    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-            throws HyracksDataException {
-        byte bitmap = getField(accessor, tIndex, 1);
-        byte count = 1;
-
-        DataOutput fieldOutput = tupleBuilder.getDataOutput();
-        try {
-            fieldOutput.writeByte(bitmap);
-            tupleBuilder.addFieldEndOffset();
-            fieldOutput.writeByte(count);
-            tupleBuilder.addFieldEndOffset();
-        } catch (IOException e) {
-            throw new HyracksDataException("I/O exception when initializing the aggregator.");
-        }
-    }
-
-    @Override
-    public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-            int stateTupleIndex, AggregateState state) throws HyracksDataException {
-        byte bitmap = getField(accessor, tIndex, 1);
-        short count = 1;
-
-        int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
-        int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
-        int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
-        int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
-        int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
-        byte[] data = stateAccessor.getBuffer().array();
-
-        bitmap |= data[bitoffset];
-        count += data[countoffset];
-        if (count >= MAX) {
-            count = (byte) MAX;
-        }
-        data[bitoffset] = bitmap;
-        data[countoffset] = (byte) count;
-    }
-
-    @Override
-    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException {
-        byte bitmap = getField(accessor, tIndex, 1);
-        byte count = getField(accessor, tIndex, 2);
-        DataOutput fieldOutput = tupleBuilder.getDataOutput();
-        try {
-            fieldOutput.writeByte(bitmap);
-            tupleBuilder.addFieldEndOffset();
-            fieldOutput.writeByte(count);
-            tupleBuilder.addFieldEndOffset();
-        } catch (IOException e) {
-            throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
-        }
-
-    }
-
-    @Override
-    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException {
-        outputPartialResult(tupleBuilder, accessor, tIndex, state);
-    }
-
-};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
deleted file mode 100644
index b5eb70f..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.dataflow.aggregators;

-

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

-

-/**

- * count

- */

-public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

-    private static final long serialVersionUID = 1L;

-

-    public MergeKmerAggregateFactory() {

-    }

-

-    @Override

-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

-            throws HyracksDataException {

-        return new LocalAggregatorDescriptor();

-    }

-

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
index ebf1282..3826f9b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.data.std.accessors;
+package edu.uci.ics.genomix.hyracks.data.accessors;
 
 import java.io.DataInput;
 import java.io.DataOutput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
similarity index 92%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
index 34c29c7..a4cfd3b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
@@ -13,9 +13,9 @@
  * limitations under the License.

  */

 

-package edu.uci.ics.genomix.data.std.accessors;

+package edu.uci.ics.genomix.hyracks.data.accessors;

 

-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

 

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
index 8aaf380..eb0d6bb 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.

  */

 

-package edu.uci.ics.genomix.data.std.accessors;

+package edu.uci.ics.genomix.hyracks.data.accessors;

 

 import java.nio.ByteBuffer;

 

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
similarity index 91%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
index 83b9d12..44b0e10 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
@@ -13,9 +13,9 @@
  * limitations under the License.

  */

 

-package edu.uci.ics.genomix.data.std.accessors;

+package edu.uci.ics.genomix.hyracks.data.accessors;

 

-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;

 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
new file mode 100644
index 0000000..4d00731
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDNormarlizedComputeFactory implements INormalizedKeyComputerFactory{
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer(){
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                return IntegerSerializerDeserializer.getInt(bytes, start);
+            }
+            
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
new file mode 100644
index 0000000..d328bd1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        // TODO Auto-generated method stub
+        return new ITuplePartitionComputer() {
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+                int startOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
+                int slotLength = accessor.getFieldSlotsLength();
+
+                ByteBuffer buf = accessor.getBuffer();
+
+                int hash = IntegerSerializerDeserializer.getInt(buf.array(), startOffset + fieldOffset + slotLength);
+                if (hash < 0) {
+                    hash = -(hash + 1);
+                }
+
+                return hash % nParts;
+            }
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
index 8febfa5..4ceda78 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
@@ -13,9 +13,9 @@
  * limitations under the License.

  */

 

-package edu.uci.ics.genomix.data.std.primitive;

+package edu.uci.ics.genomix.hyracks.data.primitive;

 

-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;

+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;

 import edu.uci.ics.hyracks.data.std.api.AbstractPointable;

 import edu.uci.ics.hyracks.data.std.api.IComparable;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
new file mode 100644
index 0000000..fcc61d5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.type.NodeWritable;
+
+public class NodeReference extends NodeWritable{
+
+    public NodeReference(int kmerSize) {
+        super(kmerSize);
+        // TODO Auto-generated constructor stub
+    }
+ 
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
new file mode 100644
index 0000000..7b46fa5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionListReference extends PositionListWritable implements  IValueReference {
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
new file mode 100644
index 0000000..4c19595
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionReference extends PositionWritable implements IValueReference {
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
index ed1b926..fe44f51 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -13,7 +13,7 @@
  * limitations under the License.

  */

 

-package edu.uci.ics.genomix.dataflow;

+package edu.uci.ics.genomix.hyracks.dataflow;

 

 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
new file mode 100644
index 0000000..fe72a81
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -0,0 +1,173 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
+
+    public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = recDesc;
+    }
+
+    private static final long serialVersionUID = 1L;
+    public static final int InputKmerField = 0;
+    public static final int InputPosListField = 1;
+
+    public static final int OutputReadIDField = 0;
+    public static final int OutputPosInReadField = 1;
+    public static final int OutputOtherReadIDListField = 2;
+    public static final int OutputKmerField = 3; // may not needed
+
+    /**
+     * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherReadID,...},*Kmer*)
+     * OtherReadID appears only when otherReadID.otherPos==0
+     */
+    public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        private final IHyracksTaskContext ctx;
+        private final RecordDescriptor inputRecDesc;
+        private final RecordDescriptor outputRecDesc;
+
+        private FrameTupleAccessor accessor;
+        private ByteBuffer writeBuffer;
+        private ArrayTupleBuilder builder;
+        private FrameTupleAppender appender;
+
+        private PositionReference positionEntry;
+        private ArrayBackedValueStorage posListEntry;
+        private ArrayBackedValueStorage zeroPositionCollection;
+        private ArrayBackedValueStorage noneZeroPositionCollection;
+
+        public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+                RecordDescriptor outputRecDesc) {
+            this.ctx = ctx;
+            this.inputRecDesc = inputRecDesc;
+            this.outputRecDesc = outputRecDesc;
+            this.positionEntry = new PositionReference();
+            this.posListEntry = new ArrayBackedValueStorage();
+            this.zeroPositionCollection = new ArrayBackedValueStorage();
+            this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+            writeBuffer = ctx.allocateFrame();
+            builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+            appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(writeBuffer, true);
+            writer.open();
+            posListEntry.reset();
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            accessor.reset(buffer);
+            int tupleCount = accessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
+                scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+            }
+        }
+
+        private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
+                ArrayBackedValueStorage noneZeroPositionCollection2) {
+            zeroPositionCollection2.reset();
+            noneZeroPositionCollection2.reset();
+            byte[] data = accessor.getBuffer().array();
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
+            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+                positionEntry.setNewReference(data, offsetPoslist + i);
+                if (positionEntry.getPosInRead() == 0) {
+                    zeroPositionCollection2.append(positionEntry);
+                } else {
+                    noneZeroPositionCollection2.append(positionEntry);
+                }
+            }
+
+        }
+
+        private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+                ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
+            byte[] data = accessor.getBuffer().array();
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
+            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+                positionEntry.setNewReference(data, offsetPoslist + i);
+                if (positionEntry.getPosInRead() != 0) {
+                    appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
+                } else {
+                    appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
+                }
+            }
+        }
+
+        private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
+                ArrayTupleBuilder builder2) {
+            try {
+                builder2.addField(pos.getByteArray(), 0, PositionReference.INTBYTES);
+                builder2.addField(pos.getByteArray(), PositionReference.INTBYTES, 1);
+                //? ask Yingyi, if support empty bytes[]
+                if (posList2 == null) {
+                    builder2.addFieldEndOffset();
+                } else {
+                    builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+                }
+                // set kmer, may not useful
+                byte[] data = accessor.getBuffer().array();
+                int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, InputKmerField);
+                builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+
+                if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
+                    if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+                builder2.reset();
+            } catch (HyracksDataException e) {
+                throw new IllegalStateException(
+                        "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+            }
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+        }
+
+    }
+
+    @Override
+    public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
+                0), recordDescriptors[0]);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
new file mode 100644
index 0000000..46a92ec
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -0,0 +1,206 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapReadToNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+    public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = outRecDesc;
+        this.kmerSize = kmerSize;
+    }
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    private final int kmerSize;
+
+    public static final int InputReadIDField = 0;
+    public static final int InputInfoFieldStart = 1;
+
+    public static final int OutputNodeIDField = 0;
+    public static final int OutputCountOfKmerField = 1;
+    public static final int OutputIncomingField = 2;
+    public static final int OutputOutgoingField = 3;
+    public static final int OutputKmerBytesField = 4;
+
+    /**
+     * (ReadID, Storage[posInRead]={len, PositionList, len, Kmer})
+     * to (Position, LengthCount, InComingPosList, OutgoingPosList, Kmer)
+     */
+    public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        public static final int INT_LENGTH = 4;
+        private final IHyracksTaskContext ctx;
+        private final RecordDescriptor inputRecDesc;
+        private final RecordDescriptor outputRecDesc;
+
+        private FrameTupleAccessor accessor;
+        private ByteBuffer writeBuffer;
+        private ArrayTupleBuilder builder;
+        private FrameTupleAppender appender;
+
+        private NodeReference curNodeEntry;
+        private NodeReference nextNodeEntry;
+
+        public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+                RecordDescriptor outputRecDesc, int kmerSize) {
+            this.ctx = ctx;
+            this.inputRecDesc = inputRecDesc;
+            this.outputRecDesc = outputRecDesc;
+            curNodeEntry = new NodeReference(kmerSize);
+            nextNodeEntry = new NodeReference(kmerSize);
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+            writeBuffer = ctx.allocateFrame();
+            builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+            appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(writeBuffer, true);
+            writer.open();
+            curNodeEntry.reset();
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            accessor.reset(buffer);
+            int tupleCount = accessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                generateNodeFromRead(i);
+            }
+        }
+
+        private void generateNodeFromRead(int tIndex) throws HyracksDataException {
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+            int readID = accessor.getBuffer().getInt(
+                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+            resetNode(curNodeEntry, readID, (byte) 0,
+                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), false);
+
+            for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
+                resetNode(nextNodeEntry, readID, (byte) (i - InputInfoFieldStart),
+                        offsetPoslist + accessor.getFieldStartOffset(tIndex, i), true);
+                if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
+                    curNodeEntry.mergeNextWithinOneRead(nextNodeEntry);
+                } else {
+                    curNodeEntry.setOutgoingList(nextNodeEntry.getOutgoingList());
+                    curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
+                    outputNode(curNodeEntry);
+                    nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+                    curNodeEntry.set(nextNodeEntry);
+                }
+            }
+            outputNode(curNodeEntry);
+        }
+
+        private void outputNode(NodeReference node) throws HyracksDataException {
+            try {
+                builder.addField(node.getNodeID().getByteArray(), node.getNodeID().getStartOffset(), node.getNodeID()
+                        .getLength());
+                builder.getDataOutput().writeInt(node.getCount());
+                builder.addFieldEndOffset();
+                builder.addField(node.getIncomingList().getByteArray(), node.getIncomingList().getStartOffset(), node
+                        .getIncomingList().getLength());
+                builder.addField(node.getOutgoingList().getByteArray(), node.getOutgoingList().getStartOffset(), node
+                        .getOutgoingList().getLength());
+                builder.addField(node.getKmer().getBytes(), node.getKmer().getOffset(), node.getKmer().getLength());
+
+                if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
+                    if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                        throw new IllegalStateException("Failed to append tuplebuilder to frame");
+                    }
+                }
+                builder.reset();
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
+            }
+        }
+
+        private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean byRef) {
+            node.reset();
+            node.setNodeID(readID, posInRead);
+
+            ByteBuffer buffer = accessor.getBuffer();
+            int lengthOfPosition = buffer.getInt(offset);
+            if (lengthOfPosition % PositionReference.LENGTH != 0) {
+                throw new IllegalStateException("Size of PositionList is invalid ");
+            }
+            offset += INT_LENGTH;
+            if (posInRead == 0) {
+                setPositionList(node.getIncomingList(), lengthOfPosition / PositionReference.LENGTH, buffer.array(),
+                        offset, byRef);
+            } else {
+                setPositionList(node.getOutgoingList(), lengthOfPosition / PositionReference.LENGTH, buffer.array(),
+                        offset, byRef);
+            }
+            offset += lengthOfPosition;
+            int lengthKmer = buffer.getInt(offset);
+            if (node.getKmer().getLength() != lengthKmer) {
+                throw new IllegalStateException("Size of Kmer is invalid ");
+            }
+            setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, byRef);
+            node.setCount(1);
+        }
+
+        private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean byRef) {
+            if (byRef) {
+                kmer.setNewReference(array, offset);
+            } else {
+                kmer.set(array, offset);
+            }
+        }
+
+        private void setPositionList(PositionListWritable positionListWritable, int count, byte[] array, int offset, boolean byRef) {
+            if (byRef) {
+                positionListWritable.setNewReference(count, array, offset);
+            } else {
+                positionListWritable.set(count, array, offset);
+            }
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+        }
+
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+                recordDescriptors[0], kmerSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
similarity index 64%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 409b434..1bc2137 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -13,17 +13,18 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow;
 
 import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,14 +32,13 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
 
-;
-
 public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
     private static final long serialVersionUID = 1L;
-
+    private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
     private KmerBytesWritable kmer;
     private boolean bReversed;
 
@@ -58,76 +58,62 @@
 
             @Override
             public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
-                String geneLine = value.toString(); // Read the Real Gene Line
+                String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
+                if (geneLine.length != 2) {
+                    return;
+                }
+                int readID = 0;
+                try {
+                    readID = Integer.parseInt(geneLine[0]);
+                } catch (NumberFormatException e) {
+                    LOG.warn("Invalid data");
+                    return;
+                }
+
                 Pattern genePattern = Pattern.compile("[AGCT]+");
-                Matcher geneMatcher = genePattern.matcher(geneLine);
+                Matcher geneMatcher = genePattern.matcher(geneLine[1]);
                 boolean isValid = geneMatcher.matches();
                 if (isValid) {
-                    SplitReads(geneLine.getBytes(), writer);
+                    SplitReads(readID, geneLine[1].getBytes(), writer);
                 }
             }
 
-            private void SplitReads(byte[] array, IFrameWriter writer) {
+            private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
                 /** first kmer */
                 int k = kmer.getKmerLength();
-                if (k >= array.length){
-                	return;
+                if (k >= array.length) {
+                    return;
                 }
                 kmer.setByRead(array, 0);
-                byte pre = 0;
-                byte next = GeneCode.getAdjBit(array[k]);
-                InsertToFrame(kmer, pre, next, writer);
+                InsertToFrame(kmer, readID, 0, writer);
 
                 /** middle kmer */
-                for (int i = k; i < array.length - 1; i++) {
-                    pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
-                    next = GeneCode.getAdjBit(array[i + 1]);
-                    InsertToFrame(kmer, pre, next, writer);
+                for (int i = k; i < array.length; i++) {
+                    kmer.shiftKmerWithNextChar(array[i]);
+                    InsertToFrame(kmer, readID, i - k + 1, writer);
                 }
 
-                /** last kmer */
-                pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
-                next = 0;
-                InsertToFrame(kmer, pre, next, writer);
-
                 if (bReversed) {
                     /** first kmer */
                     kmer.setByReadReverse(array, 0);
-                    next = 0;
-                    pre = GeneCode.getAdjBit(array[k]);
-                    InsertToFrame(kmer, pre, next, writer);
+                    InsertToFrame(kmer, -readID, array.length - k, writer);
                     /** middle kmer */
-                    for (int i = k; i < array.length - 1; i++) {
-                        next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
-                        pre = GeneCode.getAdjBit(array[i + 1]);
-                        InsertToFrame(kmer, pre, next, writer);
+                    for (int i = k; i < array.length; i++) {
+                        kmer.shiftKmerWithPreChar(array[i]);
+                        InsertToFrame(kmer, -readID, array.length - i - 1, writer);
                     }
-                    /** last kmer */
-                    next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
-                    pre = 0;
-                    InsertToFrame(kmer, pre, next, writer);
                 }
             }
 
-            /**
-             * At this graph building phase, we assume the kmer length are all
-             * the same Thus we didn't output those Kmer length
-             * 
-             * @param kmer
-             *            :input kmer
-             * @param pre
-             *            : pre neighbor code
-             * @param next
-             *            : next neighbor code
-             * @param writer
-             *            : output writer
-             */
-            private void InsertToFrame(KmerBytesWritable kmer, byte pre, byte next, IFrameWriter writer) {
+            private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
                 try {
-                    byte adj = GeneCode.mergePreNextAdj(pre, next);
+                    if (posInRead > 127){
+                        throw new IllegalArgumentException ("Position id is beyond 127 at " + readID);
+                    }
                     tupleBuilder.reset();
                     tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, adj);
+                    tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, readID);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, (byte)posInRead);
 
                     if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                             tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..7dc4947
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        return new IAggregatorDescriptor() {
+            private PositionReference position = new PositionReference();
+
+            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+                return offset;
+            }
+
+            protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+                        getOffSet(accessor, tIndex, fieldId));
+            }
+
+            protected int readIntField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                return IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        getOffSet(accessor, tIndex, fieldId));
+            }
+
+            @Override
+            public void reset() {
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState(new ArrayBackedValueStorage());
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                inputVal.reset();
+                position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+                inputVal.append(position);
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+                inputVal.append(position);
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("partial result method should not be called");
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                try {
+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
new file mode 100644
index 0000000..c7552ca
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -0,0 +1,136 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    public static final int InputReadIDField = MapKmerPositionToReadOperator.OutputReadIDField;
+    public static final int InputPosInReadField = MapKmerPositionToReadOperator.OutputPosInReadField;
+    public static final int InputPositionListField = MapKmerPositionToReadOperator.OutputOtherReadIDListField;
+    public static final int InputKmerField = MapKmerPositionToReadOperator.OutputKmerField;
+
+    public static final int OutputReadIDField = 0;
+    public static final int OutputPositionListField = 1;
+
+    public AggregateReadIDAggregateFactory() {
+    }
+
+    /**
+     * (ReadID,PosInRead,{OtherPosition,...},Kmer) to
+     * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        return new IAggregatorDescriptor() {
+
+            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+                return offset;
+            }
+
+            protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+                        getOffSet(accessor, tIndex, fieldId));
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState(new ArrayBackedValueStorage());
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+                storage.reset();
+                DataOutput out = storage.getDataOutput();
+                byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+                try {
+                    out.writeByte(posInRead);
+                    writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+                    writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write into temporary storage");
+                }
+
+            }
+
+            private void writeBytesToStorage(DataOutput out, IFrameTupleAccessor accessor, int tIndex, int idField)
+                    throws IOException {
+                int len = accessor.getFieldLength(tIndex, idField);
+                out.writeInt(len);
+                out.write(accessor.getBuffer().array(), getOffSet(accessor, tIndex, idField), len);
+            }
+
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+                DataOutput out = storage.getDataOutput();
+                byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+                try {
+                    out.writeByte(posInRead);
+                    writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+                    writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write into temporary storage");
+                }
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("partial result method should not be called");
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                try {
+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+
+            }
+
+        };
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..b98db56
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,105 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;

+

+import java.io.DataOutput;

+import java.io.IOException;

+

+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

+

+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

+    private static final long serialVersionUID = 1L;

+

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new IAggregatorDescriptor (){

+

+            private PositionReference positionReEntry = new PositionReference();

+

+            @Override

+            public AggregateState createAggregateStates() {

+                return new AggregateState(new ArrayBackedValueStorage());

+            }

+

+            @Override

+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)

+                    throws HyracksDataException {

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;

+                inputVal.reset();

+                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

+                for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

+                    positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(positionReEntry);

+                }

+            }

+

+            @Override

+            public void reset() {

+                // TODO Auto-generated method stub

+                

+            }

+

+            @Override

+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;

+                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

+                for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

+                    positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(positionReEntry);

+                }

+            }

+

+            @Override

+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                throw new IllegalStateException("partial result method should not be called");

+            }

+

+            @Override

+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;

+                try {

+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+                }

+            }

+

+            @Override

+            public void close() {

+                // TODO Auto-generated method stub

+                

+            }

+            

+        };

+        

+    }

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
new file mode 100644
index 0000000..2877ee6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -0,0 +1,173 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    private final int ValidPosCount;
+
+    public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+        ValidPosCount = getPositionCount(readLength, kmerLength);
+    }
+
+    public static int getPositionCount(int readLength, int kmerLength){
+        return readLength - kmerLength + 1;
+    }
+    public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+    public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+
+    public static final int BYTE_SIZE = 1;
+    public static final int INTEGER_SIZE = 4;
+
+    /**
+     * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to
+     * Aggregate as 
+     * (ReadID, Storage[posInRead]={PositionList,Kmer})
+     * 
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public AggregateState createAggregateStates() {
+                ArrayBackedValueStorage[] storages = new ArrayBackedValueStorage[ValidPosCount];
+                for (int i = 0; i < storages.length; i++) {
+                    storages[i] = new ArrayBackedValueStorage();
+                }
+                return new AggregateState(Pair.of(storages, 0));
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                @SuppressWarnings("unchecked")
+                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+                ArrayBackedValueStorage[] storages = pair.getLeft();
+                for (ArrayBackedValueStorage each : storages) {
+                    each.reset();
+                }
+                int count = 0;
+
+                int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+                ByteBuffer fieldBuffer = accessor.getBuffer();
+
+                while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+                    byte posInRead = fieldBuffer.get(fieldOffset);
+                    if (storages[posInRead].getLength() > 0) {
+                        throw new IllegalArgumentException("Reentering into an exist storage");
+                    }
+                    fieldOffset += BYTE_SIZE;
+                    // read poslist
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    // read Kmer
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    count++;
+                }
+                pair.setValue(count);
+            }
+
+            private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
+                    throws HyracksDataException {
+                int lengthPosList = fieldBuffer.getInt(fieldOffset);
+                try {
+                    storage.getDataOutput().writeInt(lengthPosList);
+                    fieldOffset += INTEGER_SIZE;
+                    storage.getDataOutput().write(fieldBuffer.array(), fieldOffset, lengthPosList);
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write into temporary storage");
+                }
+                return lengthPosList + INTEGER_SIZE;
+            }
+
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                @SuppressWarnings("unchecked")
+                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+                ArrayBackedValueStorage[] storages = pair.getLeft();
+                int count = pair.getRight();
+
+                int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+                ByteBuffer fieldBuffer = accessor.getBuffer();
+
+                while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+                    byte posInRead = fieldBuffer.get(fieldOffset);
+                    if (storages[posInRead].getLength() > 0) {
+                        throw new IllegalArgumentException("Reentering into an exist storage");
+                    }
+                    fieldOffset += BYTE_SIZE;
+                    // read poslist
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    // read Kmer
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    count++;
+                }
+                pair.setValue(count);
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("partial result method should not be called");
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                @SuppressWarnings("unchecked")
+                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+                ArrayBackedValueStorage[] storages = pair.getLeft();
+                int count = pair.getRight();
+                if (count != storages.length) {
+                    throw new IllegalStateException("Final aggregate position number is invalid");
+                }
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                try {
+                    for (int i = 0; i < storages.length; i++) {
+                        fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(), storages[i].getLength());
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+
+            }
+
+        };
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
similarity index 87%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 405e109..0772eb3 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -24,9 +24,8 @@
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -43,7 +42,7 @@
 
     public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
         this.confFactory = new ConfFactory(conf);
-        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
     }
 
     public class TupleWriter implements ITupleWriter {
@@ -54,7 +53,6 @@
         ConfFactory cf;
         Writer writer = null;
 
-        KmerCountValue reEnterCount = new KmerCountValue();
         KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
 
         /**
@@ -66,12 +64,15 @@
                 byte[] kmer = tuple.getFieldData(0);
                 int keyStart = tuple.getFieldStart(0);
                 int keyLength = tuple.getFieldLength(0);
+                if (reEnterKey.getLength() > keyLength){
+                    throw new IllegalArgumentException("Not enough kmer bytes");
+                }
 
                 byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
                 byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
-                reEnterCount.set(bitmap, count);
-                reEnterKey.set(kmer, keyStart, keyLength);
-                writer.append(reEnterKey, reEnterCount);
+//                reEnterCount.set(bitmap, count);
+                reEnterKey.set(kmer, keyStart);
+                writer.append(reEnterKey, null);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -81,7 +82,7 @@
         public void open(DataOutput output) throws HyracksDataException {
             try {
                 writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
-                        KmerCountValue.class, CompressionType.NONE, null);
+                        null, CompressionType.NONE, null);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
similarity index 91%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index cfa7262..eac0045 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -13,12 +13,11 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
 
 import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.genomix.type.GeneCode;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -42,10 +41,10 @@
         @Override
         public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
             try {
-                kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0), tuple.getFieldLength(0));
+                kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0));
                 output.write(kmer.toString().getBytes());
                 output.writeByte('\t');
-                output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
+//                output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
                 output.writeByte('\t');
                 output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
                 output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
new file mode 100644
index 0000000..272a21c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -0,0 +1,103 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public class NodeSequenceWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public static final int InputNodeIDField = MapReadToNodeOperator.OutputNodeIDField;
+    public static final int InputCountOfKmerField = MapReadToNodeOperator.OutputCountOfKmerField;
+    public static final int InputIncomingField = MapReadToNodeOperator.OutputIncomingField;
+    public static final int InputOutgoingField = MapReadToNodeOperator.OutputOutgoingField;
+    public static final int InputKmerBytesField = MapReadToNodeOperator.OutputKmerBytesField;
+
+    private ConfFactory confFactory;
+    private final int kmerlength;
+
+    public NodeSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+        this.confFactory = new ConfFactory(conf);
+        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+    }
+
+    public class TupleWriter implements ITupleWriter {
+
+        public TupleWriter(ConfFactory confFactory) {
+            this.cf = confFactory;
+        }
+
+        ConfFactory cf;
+        Writer writer = null;
+        NodeWritable node = new NodeWritable(kmerlength);
+
+        @Override
+        public void open(DataOutput output) throws HyracksDataException {
+            try {
+                writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, NodeWritable.class, null,
+                        CompressionType.NONE, null);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+            node.getNodeID().setNewReference(tuple.getFieldData(InputNodeIDField),
+                    tuple.getFieldStart(InputNodeIDField));
+            node.setCount(Marshal.getInt(tuple.getFieldData(InputCountOfKmerField),
+                    tuple.getFieldStart(InputCountOfKmerField)));
+            node.getIncomingList().setNewReference(tuple.getFieldLength(InputIncomingField) / PositionWritable.LENGTH,
+                    tuple.getFieldData(InputIncomingField), tuple.getFieldStart(InputIncomingField));
+            node.getOutgoingList().setNewReference(tuple.getFieldLength(InputOutgoingField) / PositionWritable.LENGTH,
+                    tuple.getFieldData(InputOutgoingField), tuple.getFieldStart(InputOutgoingField));
+
+            node.getKmer().setNewReference(node.getCount() + kmerlength - 1, tuple.getFieldData(InputKmerBytesField),
+                    tuple.getFieldStart(InputKmerBytesField));
+
+            try {
+                writer.append(node, null);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void close(DataOutput output) throws HyracksDataException {
+            // TODO Auto-generated method stub
+
+        }
+
+    }
+
+    /**
+     * Input schema:
+     * (Position, LengthCount, InComingPosList, OutgoingPosList, Kmer)
+     */
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new TupleWriter(confFactory);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
new file mode 100644
index 0000000..5b549b5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    private final int initialKmerSize;
+
+    public NodeTextWriterFactory(int initialKmerSize) {
+        this.initialKmerSize = initialKmerSize;
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new ITupleWriter() {
+            NodeWritable node = new NodeWritable(initialKmerSize);
+
+            @Override
+            public void open(DataOutput output) throws HyracksDataException {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
+                node.setCount(Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)));
+                node.getIncomingList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputIncomingField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputIncomingField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputIncomingField));
+                node.getOutgoingList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputOutgoingField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputOutgoingField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputOutgoingField));
+
+                node.getKmer().setNewReference(node.getCount() + initialKmerSize - 1,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputKmerBytesField));
+                try {
+                    output.write(node.toString().getBytes());
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void close(DataOutput output) throws HyracksDataException {
+                // TODO Auto-generated method stub
+
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index 27d5e15..d958205 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.driver;
+package edu.uci.ics.genomix.hyracks.driver;
 
 import java.net.URL;
 import java.util.EnumSet;
@@ -24,9 +24,9 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.GenericOptionsParser;
 
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.job.JobGen;
-import edu.uci.ics.genomix.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
similarity index 92%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
index be82477..4b0c984 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.job;
+package edu.uci.ics.genomix.hyracks.job;
 
 import java.io.IOException;
 
@@ -25,7 +25,9 @@
     public static final String JOB_NAME = "genomix";
 
     /** Kmers length */
-    public static final String KMER_LENGTH = "genomix.kmer";
+    public static final String KMER_LENGTH = "genomix.kmerlen";
+    /** Read length */
+    public static final String READ_LENGTH = "genomix.readlen";
     /** Frame Size */
     public static final String FRAME_SIZE = "genomix.framesize";
     /** Frame Limit, hyracks need */
@@ -46,7 +48,8 @@
     public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
     public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
 
-    public static final int DEFAULT_KMER = 21;
+    public static final int DEFAULT_KMERLEN = 21;
+    public static final int DEFAULT_READLEN = 124;
     public static final int DEFAULT_FRAME_SIZE = 32768;
     public static final int DEFAULT_FRAME_LIMIT = 4096;
     public static final int DEFAULT_TABLE_SIZE = 10485767;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
index b1f9a29..3563f93 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.job;
+package edu.uci.ics.genomix.hyracks.job;
 
 import java.util.UUID;
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..117b7e0
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDNormarlizedComputeFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenBrujinGraph extends JobGen {
+    public enum GroupbyType {
+        EXTERNAL,
+        PRECLUSTER,
+        HYBRIDHASH,
+    }
+
+    public enum OutputFormat {
+        TEXT,
+        BINARY,
+    }
+
+    JobConf job;
+    private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    private Scheduler scheduler;
+    private String[] ncNodeNames;
+
+    private int readLength;
+    private int kmerSize;
+    private int frameLimits;
+    private int frameSize;
+    private int tableSize;
+    private GroupbyType groupbyType;
+    private OutputFormat outputFormat;
+    private boolean bGenerateReversedKmer;
+
+    private void logDebug(String status) {
+        LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+    }
+
+    public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) {
+        super(job);
+        this.scheduler = scheduler;
+        String[] nodes = new String[ncMap.size()];
+        ncMap.keySet().toArray(nodes);
+        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+        for (int i = 0; i < numPartitionPerMachine; i++) {
+            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+        }
+        logDebug("initialize");
+    }
+
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor outRed) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+                aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+                        tableSize), true);
+    }
+
+    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
+            IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+            throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
+        Object[] obj = new Object[3];
+
+        switch (groupbyType) {
+            case EXTERNAL:
+                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+                        combineRed);
+                obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+                obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+                        finalRec);
+                break;
+            case PRECLUSTER:
+            default:
+                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+                        combineRed);
+                obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+                obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+                        finalRec);
+                break;
+        }
+        return obj;
+    }
+
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec, RecordDescriptor outRec)
+            throws HyracksDataException {
+        try {
+
+            InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
+
+            LOG.info("HDFS read into " + splits.length + " splits");
+            String[] readSchedule = scheduler.getLocationConstraints(splits);
+            return new HDFSReadOperatorDescriptor(jobSpec, outRec, job, splits, readSchedule,
+                    new ReadsKeyValueParserFactory(kmerSize, bGenerateReversedKmer));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+        jobSpec.connect(conn, preOp, 0, nextOp, 0);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null });
+        RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        jobSpec.setFrameSize(frameSize);
+
+        // File input
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec, readKmerOutputRec);
+
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
+                new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+        AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        logDebug("LocalKmerGroupby Operator");
+        connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("CrossKmerGroupby Operator");
+        IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+
+        logDebug("Map Kmer to Read Operator");
+        //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer) 
+        RecordDescriptor readIDOutputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { null, null, null, null });
+        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("Group by Read Operator");
+        // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} 
+        RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        RecordDescriptor readIDFinalRec = new RecordDescriptor(
+                new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+        objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+                new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
+                new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
+        AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("Group by ReadID merger");
+        IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+
+        logDebug("Map ReadInfo to Node");
+        //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
+        RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
+        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec, kmerSize);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        // Output Kmer
+        ITupleWriterFactory kmerWriter = null;
+        ITupleWriterFactory nodeWriter = null;
+        switch (outputFormat) {
+            case TEXT:
+                kmerWriter = new KMerTextWriterFactory(kmerSize);
+                nodeWriter = new NodeTextWriterFactory(kmerSize);
+                break;
+            case BINARY:
+            default:
+                kmerWriter = new KMerSequenceWriterFactory(job);
+                nodeWriter = new NodeSequenceWriterFactory(job);
+                break;
+        }
+        logDebug("WriteOperator");
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, kmerWriter);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+
+        // Output Node
+        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
+        connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeNodeOperator);
+
+        if (groupbyType == GroupbyType.PRECLUSTER) {
+            jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        }
+        return jobSpec;
+    }
+
+    @Override
+    protected void initJobConfiguration() {
+        readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
+        kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+        if (kmerSize % 2 == 0) {
+            kmerSize--;
+            conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
+        }
+        frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+
+        bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
+
+        String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+        if (type.equalsIgnoreCase("external")) {
+            groupbyType = GroupbyType.EXTERNAL;
+        } else if (type.equalsIgnoreCase("precluster")) {
+            groupbyType = GroupbyType.PRECLUSTER;
+        } else {
+            groupbyType = GroupbyType.HYBRIDHASH;
+        }
+
+        String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+        if (output.equalsIgnoreCase("text")) {
+            outputFormat = OutputFormat.TEXT;
+        } else {
+            outputFormat = OutputFormat.BINARY;
+        }
+        job = new JobConf(conf);
+        LOG.info("Genomix Graph Build Configuration");
+        LOG.info("Kmer:" + kmerSize);
+        LOG.info("Groupby type:" + type);
+        LOG.info("Output format:" + output);
+        LOG.info("Frame limit" + frameLimits);
+        LOG.info("Frame size" + frameSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
similarity index 95%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
index a88fa79..adce3f6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
@@ -13,17 +13,16 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.job;
+package edu.uci.ics.genomix.hyracks.job;
 
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.util.ByteComparatorFactory;
-import edu.uci.ics.genomix.util.StatCountAggregateFactory;
-import edu.uci.ics.genomix.util.StatReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.util.StatSumAggregateFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.util.ByteComparatorFactory;
+import edu.uci.ics.genomix.hyracks.util.StatCountAggregateFactory;
+import edu.uci.ics.genomix.hyracks.util.StatSumAggregateFactory;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -65,7 +64,7 @@
     @Override
     protected void initJobConfiguration() {
         // TODO Auto-generated method stub
-        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
         hadoopjob = new JobConf(conf);
         hadoopjob.setInputFormat(SequenceFileInputFormat.class);
     }
@@ -97,7 +96,7 @@
         AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
         jobSpec.addRoot(writeDegree);
         jobSpec.addRoot(writeCount);
-        return null;
+        return jobSpec;
     }
 
     private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
@@ -107,7 +106,7 @@
 
             String[] readSchedule = scheduler.getLocationConstraints(splits);
             return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
-                    new StatReadsKeyValueParserFactory());
+                    null); //new StatReadsKeyValueParserFactory());
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
index 832966b..b070b56 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.util;
+package edu.uci.ics.genomix.hyracks.util;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
similarity index 98%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
index 65303a8..f483a9c 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.util;
+package edu.uci.ics.genomix.hyracks.util;
 
 import java.io.DataOutput;
 import java.io.IOException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
similarity index 98%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
index 39ac60a..fb37056 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.util;
+package edu.uci.ics.genomix.hyracks.util;
 
 import java.io.DataOutput;
 import java.io.IOException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
deleted file mode 100644
index 79b38e8..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.genomix.data.std.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
-import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
-import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenBrujinGraph extends JobGen {
-    public enum GroupbyType {
-        EXTERNAL,
-        PRECLUSTER,
-        HYBRIDHASH,
-    }
-
-    public enum OutputFormat {
-        TEXT,
-        BINARY,
-    }
-
-    JobConf job;
-    private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-    private Scheduler scheduler;
-    private String[] ncNodeNames;
-
-    private int kmers;
-    private int frameLimits;
-    private int frameSize;
-    private int tableSize;
-    private GroupbyType groupbyType;
-    private OutputFormat outputFormat;
-    private boolean bGenerateReversedKmer;
-
-    private AbstractOperatorDescriptor singleGrouper;
-    private IConnectorDescriptor connPartition;
-    private AbstractOperatorDescriptor crossGrouper;
-    private RecordDescriptor readOutputRec;
-    private RecordDescriptor combineOutputRec;
-
-    /** works for hybrid hashing */
-    private long inputSizeInRawRecords;
-    private long inputSizeInUniqueKeys;
-    private int recordSizeInBytes;
-    private int hashfuncStartLevel;
-
-    private void logDebug(String status) {
-        String names = "";
-        for (String str : ncNodeNames) {
-            names += str + " ";
-        }
-        LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
-    }
-
-    public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
-            int numPartitionPerMachine) {
-        super(job);
-        this.scheduler = scheduler;
-        String[] nodes = new String[ncMap.size()];
-        ncMap.keySet().toArray(nodes);
-        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
-        for (int i = 0; i < numPartitionPerMachine; i++) {
-            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
-        }
-        logDebug("initialize");
-    }
-
-    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggeragater) {
-        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
-                combineOutputRec, new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(keyFields,
-                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                        .of(KmerPointable.FACTORY) }), tableSize), true);
-    }
-
-    private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
-            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
-            IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
-        return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
-                inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
-                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
-                combineOutputRec, true);
-    }
-
-    private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
-        int[] keyFields = new int[] { 0 }; // the id of grouped key
-
-        switch (groupbyType) {
-            case EXTERNAL:
-                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
-                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-                crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
-                break;
-            case PRECLUSTER:
-                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
-                connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
-                        new KmerHashPartitioncomputerFactory(), keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
-                crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                        new DistributedMergeLmerAggregateFactory(), combineOutputRec);
-                break;
-            case HYBRIDHASH:
-            default:
-                singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
-                        recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
-                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-
-                crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
-                        recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
-                break;
-        }
-    }
-
-    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
-        try {
-
-            InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
-
-            LOG.info("HDFS read into " + splits.length + " splits");
-            String[] readSchedule = scheduler.getLocationConstraints(splits);
-            String log = "";
-            for (String schedule : readSchedule) {
-                log += schedule + " ";
-            }
-            LOG.info("HDFS read schedule " + log);
-            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
-                    new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public JobSpecification generateJob() throws HyracksException {
-
-        JobSpecification jobSpec = new JobSpecification();
-        readOutputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
-        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
-                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
-        jobSpec.setFrameSize(frameSize);
-
-        // File input
-        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
-        logDebug("Read Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
-        generateDescriptorbyType(jobSpec);
-        logDebug("SingleGroupby Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
-
-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
-
-        logDebug("CrossGrouper Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
-        jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
-
-        // Output
-        ITupleWriterFactory writer = null;
-        switch (outputFormat) {
-            case TEXT:
-                writer = new KMerTextWriterFactory(kmers);
-                break;
-            case BINARY:
-            default:
-                writer = new KMerSequenceWriterFactory(job);
-                break;
-        }
-        HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
-
-        logDebug("WriteOperator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
-
-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
-        jobSpec.addRoot(writeOperator);
-
-        if (groupbyType == GroupbyType.PRECLUSTER) {
-            jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        }
-        return jobSpec;
-    }
-
-    @Override
-    protected void initJobConfiguration() {
-
-        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-        if (kmers % 2 == 0) {
-            kmers--;
-            conf.setInt(GenomixJob.KMER_LENGTH, kmers);
-        }
-        frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
-        tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
-        frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
-        inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
-        inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
-        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
-        hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
-        /** here read the different recordSize why ? */
-        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
-
-        bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
-
-        String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
-        if (type.equalsIgnoreCase("external")) {
-            groupbyType = GroupbyType.EXTERNAL;
-        } else if (type.equalsIgnoreCase("precluster")) {
-            groupbyType = GroupbyType.PRECLUSTER;
-        } else {
-            groupbyType = GroupbyType.HYBRIDHASH;
-        }
-
-        String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
-        if (output.equalsIgnoreCase("text")) {
-            outputFormat = OutputFormat.TEXT;
-        } else {
-            outputFormat = OutputFormat.BINARY;
-        }
-        job = new JobConf(conf);
-        LOG.info("Genomix Graph Build Configuration");
-        LOG.info("Kmer:" + kmers);
-        LOG.info("Groupby type:" + type);
-        LOG.info("Output format:" + output);
-        LOG.info("Frame limit" + frameLimits);
-        LOG.info("Frame size" + frameSize);
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
deleted file mode 100644
index 2fcca67..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
-            throws HyracksDataException {
-
-        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
-        final ByteBuffer outputBuffer = ctx.allocateFrame();
-        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputAppender.reset(outputBuffer, true);
-
-        return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
-
-            @Override
-            public void open(IFrameWriter writer) throws HyracksDataException {
-                // TODO Auto-generated method stub
-
-            }
-
-            @Override
-            public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
-                    throws HyracksDataException {
-                byte adjMap = value.getAdjBitMap();
-                byte count = value.getCount();
-                InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
-            }
-
-            @Override
-            public void close(IFrameWriter writer) throws HyracksDataException {
-                FrameUtils.flushFrame(outputBuffer, writer);
-            }
-
-            private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
-                try {
-                    tupleBuilder.reset();
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
-
-                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                            tupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outputBuffer, writer);
-                        outputAppender.reset(outputBuffer, true);
-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                                tupleBuilder.getSize())) {
-                            throw new IllegalStateException(
-                                    "Failed to copy an record into a frame: the record size is too large.");
-                        }
-                    }
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-        };
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
similarity index 60%
copy from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
copy to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 847272a..d41bcbe 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -1,19 +1,4 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.example.jobrun;
+package edu.uci.ics.genomix.hyracks.test;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
@@ -22,8 +7,6 @@
 import java.io.FileWriter;
 import java.io.IOException;
 
-import junit.framework.Assert;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,18 +17,14 @@
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Test;
 
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
 
-public class JobRunTest {
+public class JobRunStepByStepTest {
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
@@ -57,7 +36,6 @@
     private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
     private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
     private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
-
     private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
     private MiniDFSCluster dfsCluster;
 
@@ -82,7 +60,6 @@
         driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
                 edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
     }
-
     private void cleanupStores() throws IOException {
         FileUtils.forceMkdir(new File("teststore"));
         FileUtils.forceMkdir(new File("build"));
@@ -122,69 +99,7 @@
             dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
         }
     }
-
-    @Test
-    public void TestAll() throws Exception {
-        cleanUpReEntry();
-        TestExternalGroupby();
-        cleanUpReEntry();
-        TestPreClusterGroupby();
-        cleanUpReEntry();
-        TestHybridGroupby();
-        cleanUpReEntry();
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
-        TestExternalReversedGroupby();
-        cleanUpReEntry();
-        TestPreClusterReversedGroupby();
-        cleanUpReEntry();
-        TestHybridReversedGroupby();
-    }
-
-    public void TestExternalGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "external");
-        System.err.println("Testing ExternalGroupBy");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_PATH));
-    }
-
-    public void TestPreClusterGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
-        System.err.println("Testing PreClusterGroupBy");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_PATH));
-    }
-
-    public void TestHybridGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
-        System.err.println("Testing HybridGroupBy");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_PATH));
-    }
-
-    public void TestExternalReversedGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "external");
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
-        System.err.println("Testing ExternalGroupBy + Reversed");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
-    }
-
-    public void TestPreClusterReversedGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
-        System.err.println("Testing PreclusterGroupBy + Reversed");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
-    }
-
-    public void TestHybridReversedGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
-        System.err.println("Testing HybridGroupBy + Reversed");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
-    }
-
+    
     private boolean checkResults(String expectedPath) throws Exception {
         File dumped = null;
         String format = conf.get(GenomixJob.OUTPUT_FORMAT);
@@ -199,9 +114,9 @@
             BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
             for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
                 String partname = "/part-" + i;
-                //				FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
-                //						+ partname), FileSystem.getLocal(new Configuration()),
-                //						new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+                //              FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+                //                      + partname), FileSystem.getLocal(new Configuration()),
+                //                      new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
 
                 Path path = new Path(HDFS_OUTPUT_PATH + partname);
                 FileSystem dfs = FileSystem.get(conf);
@@ -212,9 +127,9 @@
 
                 //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                 KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
-                        GenomixJob.DEFAULT_KMER));
-                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
+                        GenomixJob.DEFAULT_KMERLEN));
+//                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                KmerBytesWritable value = null;
                 while (reader.next(key, value)) {
                     if (key == null || value == null) {
                         break;
@@ -242,5 +157,4 @@
     private void cleanupHDFS() throws Exception {
         dfsCluster.shutdown();
     }
-
 }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
similarity index 95%
rename from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
rename to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
index 847272a..4a7c8aa 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.example.jobrun;
+package edu.uci.ics.genomix.hyracks.test;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
@@ -34,16 +34,14 @@
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
 
 public class JobRunTest {
     private static final String ACTUAL_RESULT_DIR = "actual";
@@ -212,9 +210,9 @@
 
                 //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                 KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
-                        GenomixJob.DEFAULT_KMER));
-                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
+                        GenomixJob.DEFAULT_KMERLEN));
+//                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                KmerBytesWritable value = null;
                 while (reader.next(key, value)) {
                     if (key == null || value == null) {
                         break;
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
similarity index 98%
rename from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
rename to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index aa1f791..bcfdedd 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.example.jobrun;
+package edu.uci.ics.genomix.hyracks.test;
 
 import java.io.BufferedReader;
 import java.io.File;