hyracks contrail graph building partial completing
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java
index 2787ef3..08cfa9d 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java
@@ -16,34 +16,33 @@
     private KmerListWritable forwardReverseList;
     private KmerListWritable reverseForwardList;
     private KmerListWritable reverseReverseList;
-    private PositionWritable nodeId;
-    
+    private ReadIDWritable readId;
     public IntermediateNodeWritable(){
         forwardForwardList = new KmerListWritable();
         forwardReverseList = new KmerListWritable();
         reverseForwardList = new KmerListWritable();
         reverseReverseList = new KmerListWritable();
-        nodeId = new PositionWritable();
+        readId = new ReadIDWritable();
     }
     
     public IntermediateNodeWritable(KmerListWritable FFList, KmerListWritable FRList,
-            KmerListWritable RFList, KmerListWritable RRList, PositionWritable uniqueKey) {
+            KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
         this();
         set(FFList, FRList, RFList, RRList, uniqueKey);
     }
     
     public void set(IntermediateNodeWritable node){
         set(node.forwardForwardList, node.forwardReverseList, node.reverseForwardList, 
-                node.reverseReverseList, node.nodeId);
+                node.reverseReverseList, node.readId);
     }
     
     public void set(KmerListWritable FFList, KmerListWritable FRList,
-            KmerListWritable RFList, KmerListWritable RRList, PositionWritable uniqueKey) {
+            KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
         this.forwardForwardList.set(FFList);
         this.forwardReverseList.set(FRList);
         this.reverseForwardList.set(RFList);
         this.reverseReverseList.set(RRList);
-        this.nodeId.set(uniqueKey);
+        this.readId.set(uniqueKey);
     }
 
     public void reset(int kmerSize) {
@@ -51,7 +50,7 @@
         forwardReverseList.reset();
         reverseForwardList.reset();
         reverseReverseList.reset();
-        nodeId.reset();
+        readId.reset();
     }
     
     public KmerListWritable getFFList() {
@@ -86,12 +85,12 @@
         this.reverseReverseList.set(reverseReverseList);
     }
 
-	public PositionWritable getNodeId() {
-        return nodeId;
+	public ReadIDWritable getreadId() {
+        return readId;
     }
 
-    public void setNodeId(PositionWritable nodeId) {
-        this.nodeId.set(nodeId);
+    public void setreadId(ReadIDWritable readId) {
+        this.readId.set(readId);
     }
 
     @Override
@@ -100,7 +99,7 @@
         this.forwardReverseList.readFields(in);
         this.reverseForwardList.readFields(in);
         this.reverseReverseList.readFields(in);
-        this.nodeId.readFields(in);
+        this.readId.readFields(in);
     }
 
     @Override
@@ -109,18 +108,18 @@
         this.forwardReverseList.write(out);
         this.reverseForwardList.write(out);
         this.reverseReverseList.write(out);
-        this.nodeId.write(out);
+        this.readId.write(out);
     }
 
     @Override
     public int compareTo(IntermediateNodeWritable other) {
         // TODO Auto-generated method stub
-        return this.nodeId.compareTo(other.nodeId);
+        return this.readId.compareTo(other.readId);
     }
     
     @Override
     public int hashCode() {
-        return this.nodeId.hashCode();
+        return this.readId.hashCode();
     }
     
     @Override
@@ -130,7 +129,7 @@
             return (this.forwardForwardList.equals(nw.forwardForwardList)
                     && this.forwardReverseList.equals(nw.forwardReverseList)
                     && this.reverseForwardList.equals(nw.reverseForwardList)
-                    && this.reverseReverseList.equals(nw.reverseReverseList) && (this.nodeId.equals(nw.nodeId)));
+                    && this.reverseReverseList.equals(nw.reverseReverseList) && (this.readId.equals(nw.readId)));
         }
         return false;
     }
@@ -139,7 +138,7 @@
     public String toString() {
         StringBuilder sbuilder = new StringBuilder();
         sbuilder.append('(');
-        sbuilder.append(nodeId.toString()).append('\t');
+        sbuilder.append(readId.toString()).append('\t');
         sbuilder.append(forwardForwardList.toString()).append('\t');
         sbuilder.append(forwardReverseList.toString()).append('\t');
         sbuilder.append(reverseForwardList.toString()).append('\t');
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 96e1cc4..95bcaf8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -60,7 +60,7 @@
      * Initial Kmer space by kmerlength
      * 
      * @param k
-     *            kmerlength
+     *            kmerlengthz
      */
     public KmerBytesWritable(int k) {
         this.kmerlength = k;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
deleted file mode 100644
index 00aa633..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ /dev/null
@@ -1,212 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.WritableComparable;
-
-
-public class NodeWritable implements WritableComparable<NodeWritable>, Serializable{
-
-    private static final long serialVersionUID = 1L;
-    public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
-    
-    private PositionWritable nodeId;
-    private KmerListWritable forwardForwardList;
-    private KmerListWritable forwardReverseList;
-    private KmerListWritable reverseForwardList;
-    private KmerListWritable reverseReverseList;
-    private KmerBytesWritable kmer;
-    
-    
-    // merge/update directions
-    public static class DirectionFlag {
-        public static final byte DIR_FF = 0b00 << 0;
-        public static final byte DIR_FR = 0b01 << 0;
-        public static final byte DIR_RF = 0b10 << 0;
-        public static final byte DIR_RR = 0b11 << 0;
-        public static final byte DIR_MASK = 0b11 << 0;
-    }
-    
-    public NodeWritable() {
-        this(0);
-    }
-    
-    public NodeWritable(int kmerSize) {
-        nodeId = new PositionWritable();
-        forwardForwardList = new KmerListWritable();
-        forwardReverseList = new KmerListWritable();
-        reverseForwardList = new KmerListWritable();
-        reverseReverseList = new KmerListWritable();
-        kmer = new KmerBytesWritable(kmerSize);
-    }
-    
-    public NodeWritable(PositionWritable nodeId, KmerListWritable FFList, KmerListWritable FRList,
-            KmerListWritable RFList, KmerListWritable RRList, KmerBytesWritable kmer) {
-        this(kmer.getKmerLength());
-        set(nodeId, FFList, FRList, RFList, RRList, kmer);
-    }
-    
-    public void set(NodeWritable node){
-        set(node.nodeId, node.forwardForwardList, node.forwardReverseList, node.reverseForwardList, 
-                node.reverseReverseList, node.kmer);
-    }
-    
-    public void set(PositionWritable nodeId, KmerListWritable FFList, KmerListWritable FRList,
-            KmerListWritable RFList, KmerListWritable RRList, KmerBytesWritable kmer) {
-        this.nodeId.set(nodeId);
-        this.forwardForwardList.set(FFList);
-        this.forwardReverseList.set(FRList);
-        this.reverseForwardList.set(RFList);
-        this.reverseReverseList.set(RRList);
-        this.kmer.set(kmer);
-    }
-
-    public void reset(int kmerSize) {
-        nodeId.reset();
-        forwardForwardList.reset();
-        forwardReverseList.reset();
-        reverseForwardList.reset();
-        reverseReverseList.reset();
-        kmer.reset(kmerSize);
-    }
-    
-    public PositionWritable getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(PositionWritable nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public KmerBytesWritable getKmer() {
-        return kmer;
-    }
-
-    public void setKmer(KmerBytesWritable kmer) {
-        this.kmer = kmer;
-    }
-    
-    public int getCount() {
-        return kmer.getKmerLength();
-    }
-    
-    public KmerListWritable getFFList() {
-        return forwardForwardList;
-    }
-
-    public KmerListWritable getFRList() {
-        return forwardReverseList;
-    }
-
-    public KmerListWritable getRFList() {
-        return reverseForwardList;
-    }
-
-    public KmerListWritable getRRList() {
-        return reverseReverseList;
-    }
-    
-    public KmerListWritable getListFromDir(byte dir) {
-        switch (dir & DirectionFlag.DIR_MASK) {
-            case DirectionFlag.DIR_FF:
-                return getFFList();
-            case DirectionFlag.DIR_FR:
-                return getFRList();
-            case DirectionFlag.DIR_RF:
-                return getRFList();
-            case DirectionFlag.DIR_RR:
-                return getRRList();
-            default:
-                throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
-        }
-    }
-    @Override
-    public void write(DataOutput out) throws IOException {
-        this.nodeId.write(out);
-        this.forwardForwardList.write(out);
-        this.forwardReverseList.write(out);
-        this.reverseForwardList.write(out);
-        this.reverseReverseList.write(out);
-        this.kmer.write(out);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        this.nodeId.readFields(in);
-        this.forwardForwardList.readFields(in);
-        this.forwardReverseList.readFields(in);
-        this.reverseForwardList.readFields(in);
-        this.reverseReverseList.readFields(in);
-        this.kmer.readFields(in);
-    }
-
-    @Override
-    public int compareTo(NodeWritable other) {
-        return this.kmer.compareTo(other.kmer);
-    }
-    
-    @Override
-    public int hashCode() {
-        return this.kmer.hashCode();
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof NodeWritable) {
-            NodeWritable nw = (NodeWritable) o;
-            return (this.nodeId.equals(nw.nodeId)
-                    && this.forwardForwardList.equals(nw.forwardForwardList)
-                    && this.forwardReverseList.equals(nw.forwardReverseList)
-                    && this.reverseForwardList.equals(nw.reverseForwardList)
-                    && this.reverseReverseList.equals(nw.reverseReverseList) && this.kmer.equals(nw.kmer));
-        }
-        return false;
-    }
-    
-    @Override
-    public String toString() {
-        StringBuilder sbuilder = new StringBuilder();
-        sbuilder.append('(');
-        sbuilder.append(nodeId.toString()).append('\t');
-        sbuilder.append(forwardForwardList.toString()).append('\t');
-        sbuilder.append(forwardReverseList.toString()).append('\t');
-        sbuilder.append(reverseForwardList.toString()).append('\t');
-        sbuilder.append(reverseReverseList.toString()).append('\t');
-        sbuilder.append(kmer.toString()).append(')');
-        return sbuilder.toString();
-    }
-
-    public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
-        this.forwardForwardList.set(nextNode.forwardForwardList);
-        this.forwardReverseList.set(nextNode.forwardReverseList);
-        kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
-    }
-
-    public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
-        this.reverseForwardList.set(preNode.reverseForwardList);
-        this.reverseReverseList.set(preNode.reverseReverseList);
-        kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
-    }
-    
-    public int inDegree() {
-        return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
-    }
-
-    public int outDegree() {
-        return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
-    }
-
-    /*
-     * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1 
-     */
-    public boolean isPathNode() {
-        return inDegree() == 1 && outDegree() == 1;
-    }
-
-    public boolean isSimpleOrTerminalPath() {
-        return isPathNode() || (inDegree() == 0 && outDegree() == 1) || (inDegree() == 1 && outDegree() == 0);
-    }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java
new file mode 100644
index 0000000..7afc1ef
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java
@@ -0,0 +1,246 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.io.Writable;
+import edu.uci.ics.genomix.data.Marshal;
+public class ReadIDListWritable implements Writable, Iterable<ReadIDWritable>, Serializable{
+    private static final long serialVersionUID = 1L;
+    protected byte[] storage;
+    protected int offset;
+    protected int valueCount;
+    protected static final byte[] EMPTY = {};
+    
+    protected ReadIDWritable posIter = new ReadIDWritable();
+    
+    public ReadIDListWritable() {
+        this.storage = EMPTY;
+        this.valueCount = 0;
+        this.offset = 0;
+    }
+    
+    public ReadIDListWritable(int count, byte[] data, int offset) {
+        setNewReference(count, data, offset);
+    }
+    
+    public ReadIDListWritable(List<ReadIDWritable> posns) {
+        this();
+        setSize(posns.size());  // reserve space for all elements
+        for (ReadIDWritable p : posns) {
+            append(p);
+        }
+    }
+    
+    public void setNewReference(int count, byte[] data, int offset) {
+        this.valueCount = count;
+        this.storage = data;
+        this.offset = offset;
+    }
+    
+    public void append(long readIDByte) {
+        setSize((1 + valueCount) * ReadIDWritable.LENGTH);
+        Marshal.putLong(readIDByte, storage, offset + valueCount * ReadIDWritable.LENGTH);
+        valueCount += 1;
+    }
+    
+    public void append(byte mateId, long readId){
+        append(ReadIDWritable.serialize(mateId, readId));
+    }
+    
+    public void append(ReadIDWritable pos) {
+        if(pos != null)
+            append(pos.deserialize());
+        else
+            throw new RuntimeException("This position is null pointer!");
+    }
+    
+    /*
+     * Append the otherList to the end of myList
+     */
+    public void appendList(ReadIDListWritable otherList) {
+        if (otherList.valueCount > 0) {
+            setSize((valueCount + otherList.valueCount) * ReadIDWritable.LENGTH);
+            // copy contents of otherList into the end of my storage
+            System.arraycopy(otherList.storage, otherList.offset,
+                    storage, offset + valueCount * ReadIDWritable.LENGTH, 
+                    otherList.valueCount * ReadIDWritable.LENGTH);
+            valueCount += otherList.valueCount;
+        }
+    }
+    
+    public static int getCountByDataLength(int length) {
+        if (length % ReadIDWritable.LENGTH != 0) {
+            throw new IllegalArgumentException("Length of positionlist is invalid");
+        }
+        return length / ReadIDWritable.LENGTH;
+    }
+    
+    public void set(ReadIDListWritable otherList) {
+        set(otherList.valueCount, otherList.storage, otherList.offset);
+    }
+
+    public void set(int valueCount, byte[] newData, int offset) {
+        this.valueCount = valueCount;
+        setSize(valueCount * ReadIDWritable.LENGTH);
+        if (valueCount > 0) {
+            System.arraycopy(newData, offset, storage, this.offset, valueCount * ReadIDWritable.LENGTH);
+        }
+    }
+
+    public void reset() {
+        valueCount = 0;
+    }
+    
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+    }
+    
+    protected int getCapacity() {
+        return storage.length - offset;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap > getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (storage.length - offset > 0) {
+                System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
+            }
+            storage = new_data;
+            offset = 0;
+        }
+    }
+    
+    public ReadIDWritable getPosition(int i) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        posIter.setNewReference(storage, offset + i * ReadIDWritable.LENGTH);
+        return posIter;
+    }
+    
+    public void resetPosition(int i, long readIDByte) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        Marshal.putLong(readIDByte, storage, offset + i * ReadIDWritable.LENGTH);
+    }
+    
+    public int getCountOfPosition() {
+        return valueCount;
+    }
+
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return valueCount * ReadIDWritable.LENGTH;
+    }
+    
+    @Override
+    public Iterator<ReadIDWritable> iterator() {
+        Iterator<ReadIDWritable> it = new Iterator<ReadIDWritable>() {
+
+            private int currentIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return currentIndex < valueCount;
+            }
+
+            @Override
+            public ReadIDWritable next() {
+                return getPosition(currentIndex++);
+            }
+
+            @Override
+            public void remove() {
+                if(currentIndex < valueCount)
+                    System.arraycopy(storage, offset + currentIndex * ReadIDWritable.LENGTH, 
+                          storage, offset + (currentIndex - 1) * ReadIDWritable.LENGTH, 
+                          (valueCount - currentIndex) * ReadIDWritable.LENGTH);
+                valueCount--;
+                currentIndex--;
+            }
+        };
+        return it;
+    }
+    
+    /*
+     * remove the first instance of @toRemove. Uses a linear scan.  Throws an exception if not in this list.
+     */
+    public void remove(ReadIDWritable toRemove, boolean ignoreMissing) {
+        Iterator<ReadIDWritable> posIterator = this.iterator();
+        while (posIterator.hasNext()) {
+            if(toRemove.equals(posIterator.next())) {
+                posIterator.remove();
+                return;
+            }
+        }
+        if (!ignoreMissing) {
+            throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+        }
+    }
+    
+    public void remove(ReadIDWritable toRemove) {
+        remove(toRemove, false);
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(valueCount);
+        out.write(storage, offset, valueCount * ReadIDWritable.LENGTH);
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.valueCount = in.readInt();
+        setSize(valueCount * ReadIDWritable.LENGTH);
+        in.readFully(storage, offset, valueCount * ReadIDWritable.LENGTH);
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('[');
+        for (ReadIDWritable pos : this) {
+            sbuilder.append(pos.toString());
+            sbuilder.append(',');
+        }
+        if (valueCount > 0) {
+            sbuilder.setCharAt(sbuilder.length() - 1, ']');
+        } else {
+            sbuilder.append(']');
+        }
+        return sbuilder.toString();
+    }
+    
+    @Override
+    public int hashCode() {
+        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PositionListWritable))
+            return false;
+        PositionListWritable other = (PositionListWritable) o;
+        if (this.valueCount != other.valueCount)
+            return false;
+        for (int i=0; i < this.valueCount; i++) {
+                if (!this.getPosition(i).equals(other.getPosition(i)))
+                    return false;
+        }
+        return true;
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java
new file mode 100644
index 0000000..b6c829a
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java
@@ -0,0 +1,119 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.data.Marshal;
+
+public class ReadIDWritable implements WritableComparable<ReadIDWritable>, Serializable{
+    private static final long serialVersionUID = 1L;
+    protected byte[] storage;
+    protected int offset;
+    public static final int LENGTH = 6;
+    
+    public static final int totalBits = 48;
+    private static final int bitsForMate = 1;
+    private static final int readIdShift = bitsForMate;
+    
+    public ReadIDWritable() {
+        storage = new byte[LENGTH];
+        offset = 0;
+    }
+    
+    public ReadIDWritable(byte mateId, long readId){
+        this();
+        set(mateId, readId);
+    }
+    
+    public ReadIDWritable(byte[] storage, int offset) {
+        setNewReference(storage, offset);
+    }
+    
+    public void set(long readIDByte){
+        Marshal.putLong(readIDByte, storage, offset);
+    }
+    
+    public static long serialize(byte mateId, long readId) {
+        return (readId << 1) + (mateId & 0b1);
+    }
+    
+    public void set(byte mateId, long readId){
+        Marshal.putLong(serialize(mateId, readId), storage, offset);
+    }
+    
+    public void set(ReadIDWritable pos) {
+        set(pos.getMateId(),pos.getReadId());
+    }
+    
+    public void setNewReference(byte[] storage, int offset) {
+        this.storage = storage;
+        this.offset = offset;
+    }
+    
+    public void reset(){
+        storage = new byte[LENGTH];
+        offset = 0;
+    }
+    
+    public long deserialize(){
+        return Marshal.getLong(storage, offset);
+    }
+    
+    public byte getMateId(){
+        return (byte) (Marshal.getLong(storage, offset) & 0b1);
+    }
+    
+    public long getReadId(){
+        return Marshal.getLong(storage, offset) >>> readIdShift;
+    }
+    
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return LENGTH;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        in.readFully(storage, offset, LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.write(storage, offset, LENGTH);
+    }
+    
+    @Override
+    public int hashCode() {
+        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ReadIDWritable))
+            return false;
+        ReadIDWritable other = (ReadIDWritable) o;
+        return this.deserialize() == other.deserialize();
+    }
+    
+    @Override
+    public int compareTo(ReadIDWritable other) {
+        return (this.deserialize() < other.deserialize()) ? -1 : ((this.deserialize() == other.deserialize()) ? 0 : 1);
+    }
+    
+    /*
+     * String of form "(readId-posID_mate)" where mate is _1 or _2
+     */
+    @Override
+    public String toString() {
+        return "(" + this.getReadId() + "_" + (this.getMateId() + 1) + ")";
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..cec18bd
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        return new IAggregatorDescriptor() {
+            private PositionReference position = new PositionReference();
+
+            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+                return offset;
+            }
+
+            @Override
+            public void reset() {
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState(new ArrayBackedValueStorage());
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                inputVal.reset();
+                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+                inputVal.append(position);
+
+                // make an empty field
+                tupleBuilder.addFieldEndOffset();
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+                inputVal.append(position);
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("partial result method should not be called");
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                try {
+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
new file mode 100644
index 0000000..76ca95a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
+import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
+import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
+import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
+import edu.uci.ics.genomix.hyracks.job.JobGenUnMerged;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class Driver {
+    public static enum Plan {
+        CHECK_KMERREADER,
+        OUTPUT_KMERHASHTABLE,
+        OUTPUT_MAP_KMER_TO_READ,
+        OUTPUT_GROUPBY_READID,
+        BUILD_DEBRUJIN_GRAPH,
+        BUILD_UNMERGED_GRAPH,
+    }
+
+    private static final String IS_PROFILING = "genomix.driver.profiling";
+    private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private JobGen jobGen;
+    private boolean profiling;
+
+    private int numPartitionPerMachine;
+
+    private IHyracksClientConnection hcc;
+    private Scheduler scheduler;
+
+    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+        try {
+            hcc = new HyracksConnection(ipAddress, port);
+            scheduler = new Scheduler(hcc.getNodeControllerInfos());
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+        this.numPartitionPerMachine = numPartitionPerMachine;
+    }
+
+    public void runJob(GenomixJobConf job) throws HyracksException {
+        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+    }
+
+    public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
+        /** add hadoop configurations */
+        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+        job.addResource(hadoopCore);
+        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+        job.addResource(hadoopMapRed);
+        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+        job.addResource(hadoopHdfs);
+
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
+
+        this.profiling = profiling;
+        try {
+            Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+            LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+            switch (planChoice) {
+                case BUILD_DEBRUJIN_GRAPH:
+                default:
+                    jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+                case OUTPUT_KMERHASHTABLE:
+                    jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+                case OUTPUT_MAP_KMER_TO_READ:
+                    jobGen = new JobGenMapKmerToRead(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+                case OUTPUT_GROUPBY_READID:
+                    jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+                case CHECK_KMERREADER:
+                    jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+                case BUILD_UNMERGED_GRAPH:
+                    jobGen = new JobGenUnMerged(job, scheduler, ncMap, numPartitionPerMachine);
+            }
+
+            start = System.currentTimeMillis();
+            run(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    private void run(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification createJob = jobGen.generateJob();
+            execute(createJob);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private void execute(JobSpecification job) throws Exception {
+        job.setUseConnectorPolicyForScheduling(false);
+        JobId jobId = hcc
+                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
+
+    public static void main(String[] args) throws Exception {
+        GenomixJobConf jobConf = new GenomixJobConf();
+        String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+        if (otherArgs.length < 4) {
+            System.err.println("Need <serverIP> <port> <input> <output>");
+            System.exit(-1);
+        }
+        String ipAddress = otherArgs[0];
+        int port = Integer.parseInt(otherArgs[1]);
+        int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+        boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+        // FileInputFormat.setInputPaths(job, otherArgs[2]);
+        {
+            @SuppressWarnings("deprecation")
+            Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+            jobConf.set("mapred.input.dir", path.toString());
+
+            @SuppressWarnings("deprecation")
+            Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+            jobConf.set("mapred.output.dir", outputDir.toString());
+        }
+        // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+        Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+        driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
new file mode 100644
index 0000000..ecd95d5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+@SuppressWarnings("deprecation")
+public class GenomixJobConf extends JobConf {
+
+    public static final String JOB_NAME = "genomix";
+
+    /** Kmers length */
+    public static final String KMER_LENGTH = "genomix.kmerlen";
+    /** Read length */
+    public static final String READ_LENGTH = "genomix.readlen";
+    /** Frame Size */
+    public static final String FRAME_SIZE = "genomix.framesize";
+    /** Frame Limit, hyracks need */
+    public static final String FRAME_LIMIT = "genomix.framelimit";
+    /** Table Size, hyracks need */
+    public static final String TABLE_SIZE = "genomix.tablesize";
+    /** Groupby types */
+    public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+    /** Graph outputformat */
+    public static final String OUTPUT_FORMAT = "genomix.graph.output";
+    /** Get reversed Kmer Sequence */
+    public static final String REVERSED_KMER = "genomix.kmer.reversed";
+
+    /** Configurations used by hybrid groupby function in graph build phrase */
+    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+    public static final int DEFAULT_KMERLEN = 21;
+    public static final int DEFAULT_READLEN = 124;
+    public static final int DEFAULT_FRAME_SIZE = 128 * 1024;
+    public static final int DEFAULT_FRAME_LIMIT = 4096;
+    public static final int DEFAULT_TABLE_SIZE = 10485767;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+
+    public static final boolean DEFAULT_REVERSED = true;
+
+    public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
+    public static final String JOB_PLAN_GRAPHSTAT = "graphstat";
+
+    public static final String GROUPBY_TYPE_HYBRID = "hybrid";
+    public static final String GROUPBY_TYPE_EXTERNAL = "external";
+    public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
+    public static final String OUTPUT_FORMAT_BINARY = "binary";
+    public static final String OUTPUT_FORMAT_TEXT = "text";
+
+    public GenomixJobConf() throws IOException {
+        super(new Configuration());
+    }
+
+    public GenomixJobConf(Configuration conf) throws IOException {
+        super(conf);
+    }
+
+    /**
+     * Set the kmer length
+     * 
+     * @param the
+     *            desired frame kmerByteSize
+     */
+    final public void setKmerLength(int kmerlength) {
+        setInt(KMER_LENGTH, kmerlength);
+    }
+
+    final public void setFrameSize(int frameSize) {
+        setInt(FRAME_SIZE, frameSize);
+    }
+
+    final public void setFrameLimit(int frameLimit) {
+        setInt(FRAME_LIMIT, frameLimit);
+    }
+
+    final public void setTableSize(int tableSize) {
+        setInt(TABLE_SIZE, tableSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java
new file mode 100644
index 0000000..cc759a4
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java
@@ -0,0 +1,148 @@
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
+
+    private static final long serialVersionUID = 1L;
+    public static final IntermediateNodeWritable EMPTY_NODE = new IntermediateNodeWritable();
+    
+    private KmerListWritable forwardForwardList;
+    private KmerListWritable forwardReverseList;
+    private KmerListWritable reverseForwardList;
+    private KmerListWritable reverseReverseList;
+    private ReadIDWritable readId;
+    public IntermediateNodeWritable(){
+        forwardForwardList = new KmerListWritable();
+        forwardReverseList = new KmerListWritable();
+        reverseForwardList = new KmerListWritable();
+        reverseReverseList = new KmerListWritable();
+        readId = new ReadIDWritable();
+    }
+    
+    public IntermediateNodeWritable(KmerListWritable FFList, KmerListWritable FRList,
+            KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
+        this();
+        set(FFList, FRList, RFList, RRList, uniqueKey);
+    }
+    
+    public void set(IntermediateNodeWritable node){
+        set(node.forwardForwardList, node.forwardReverseList, node.reverseForwardList, 
+                node.reverseReverseList, node.readId);
+    }
+    
+    public void set(KmerListWritable FFList, KmerListWritable FRList,
+            KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
+        this.forwardForwardList.set(FFList);
+        this.forwardReverseList.set(FRList);
+        this.reverseForwardList.set(RFList);
+        this.reverseReverseList.set(RRList);
+        this.readId.set(uniqueKey);
+    }
+
+    public void reset(int kmerSize) {
+        forwardForwardList.reset();
+        forwardReverseList.reset();
+        reverseForwardList.reset();
+        reverseReverseList.reset();
+        readId.reset();
+    }
+    
+    public KmerListWritable getFFList() {
+        return forwardForwardList;
+    }
+
+    public void setFFList(KmerListWritable forwardForwardList) {
+        this.forwardForwardList.set(forwardForwardList);
+    }
+
+    public KmerListWritable getFRList() {
+        return forwardReverseList;
+    }
+
+    public void setFRList(KmerListWritable forwardReverseList) {
+        this.forwardReverseList.set(forwardReverseList);
+    }
+
+    public KmerListWritable getRFList() {
+        return reverseForwardList;
+    }
+
+    public void setRFList(KmerListWritable reverseForwardList) {
+        this.reverseForwardList.set(reverseForwardList);
+    }
+
+    public KmerListWritable getRRList() {
+        return reverseReverseList;
+    }
+
+    public void setRRList(KmerListWritable reverseReverseList) {
+        this.reverseReverseList.set(reverseReverseList);
+    }
+
+	public ReadIDWritable getreadId() {
+        return readId;
+    }
+
+    public void setreadId(ReadIDWritable readId) {
+        this.readId.set(readId);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.forwardForwardList.readFields(in);
+        this.forwardReverseList.readFields(in);
+        this.reverseForwardList.readFields(in);
+        this.reverseReverseList.readFields(in);
+        this.readId.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        this.forwardForwardList.write(out);
+        this.forwardReverseList.write(out);
+        this.reverseForwardList.write(out);
+        this.reverseReverseList.write(out);
+        this.readId.write(out);
+    }
+
+    @Override
+    public int compareTo(IntermediateNodeWritable other) {
+        // TODO Auto-generated method stub
+        return this.readId.compareTo(other.readId);
+    }
+    
+    @Override
+    public int hashCode() {
+        return this.readId.hashCode();
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof IntermediateNodeWritable) {
+            IntermediateNodeWritable nw = (IntermediateNodeWritable) o;
+            return (this.forwardForwardList.equals(nw.forwardForwardList)
+                    && this.forwardReverseList.equals(nw.forwardReverseList)
+                    && this.reverseForwardList.equals(nw.reverseForwardList)
+                    && this.reverseReverseList.equals(nw.reverseReverseList) && (this.readId.equals(nw.readId)));
+        }
+        return false;
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('(');
+        sbuilder.append(readId.toString()).append('\t');
+        sbuilder.append(forwardForwardList.toString()).append('\t');
+        sbuilder.append(forwardReverseList.toString()).append('\t');
+        sbuilder.append(reverseForwardList.toString()).append('\t');
+        sbuilder.append(reverseReverseList.toString()).append('\t').append(')');
+        return sbuilder.toString();
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
new file mode 100644
index 0000000..0ffdef2
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public abstract class JobGen implements Serializable {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    protected final ConfFactory confFactory;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+    public JobGen(GenomixJobConf job) throws HyracksDataException {
+        this.confFactory = new ConfFactory(job);
+    }
+
+    public abstract JobSpecification generateJob() throws HyracksException;
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
new file mode 100644
index 0000000..e280a7c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+@SuppressWarnings("deprecation")
+public class JobGenBrujinGraph extends JobGen {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public enum GroupbyType {
+        EXTERNAL,
+        PRECLUSTER,
+        HYBRIDHASH,
+    }
+
+    public enum OutputFormat {
+        TEXT,
+        BINARY,
+    }
+
+    protected ConfFactory hadoopJobConfFactory;
+    protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    protected String[] ncNodeNames;
+    protected String[] readSchedule;
+
+    protected int readLength;
+    protected int kmerSize;
+    protected int frameLimits;
+    protected int frameSize;
+    protected int tableSize;
+    protected GroupbyType groupbyType;
+    protected OutputFormat outputFormat;
+    protected boolean bGenerateReversedKmer;
+
+    protected void logDebug(String status) {
+        LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+    }
+
+    public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job);
+        String[] nodes = new String[ncMap.size()];
+        ncMap.keySet().toArray(nodes);
+        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+        for (int i = 0; i < numPartitionPerMachine; i++) {
+            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+        }
+        initJobConfiguration(scheduler);
+    }
+
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor outRed) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+                aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+                        tableSize), true);
+    }
+
+    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+            throws HyracksDataException {
+
+        Object[] obj = new Object[3];
+
+        switch (groupbyType) {
+            case EXTERNAL:
+                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+                        combineRed);
+                obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+                obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+                        finalRec);
+                break;
+            case PRECLUSTER:
+            default:
+
+                obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
+                        combineRed);
+                obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+                obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+                        finalRec);
+                jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+                break;
+        }
+        return obj;
+    }
+
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+            return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+                            kmerSize, bGenerateReversedKmer));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+        jobSpec.connect(conn, preOp, 0, nextOp, 0);
+    }
+
+    public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key 
+
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                ReadsKeyValueParserFactory.readKmerOutputRec);
+        connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
+
+        RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null, null, null, null});
+        jobSpec.setFrameSize(frameSize);
+
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
+                new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+        AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        logDebug("LocalKmerGroupby Operator");
+        connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("CrossKmerGroupby Operator");
+        IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+        return kmerCrossAggregator;
+    }
+
+/*    public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
+            AbstractOperatorDescriptor kmerCrossAggregator) {
+        // Map (Kmer, {(ReadID,PosInRead),...}) into
+        // (ReadID,PosInRead,{OtherPosition,...},Kmer)
+
+        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
+                MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return mapKmerToRead;
+    }*/
+
+/*    public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
+        // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                MapKmerPositionToReadOperator.readIDOutputRec);
+        connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
+
+        RecordDescriptor readIDFinalRec = new RecordDescriptor(
+                new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
+                new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
+                IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
+        AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("Group by ReadID merger");
+        IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+        return readCrossAggregator;
+    }*/
+
+/*    public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readCrossAggregator) {
+        // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
+        // OutgoingList, Kmer)
+
+        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+                MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return mapEachReadToNode;
+    }
+
+    public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
+            AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
+        // Output Kmer
+        ITupleWriterFactory kmerWriter = null;
+        switch (outputFormat) {
+            case TEXT:
+                kmerWriter = new KMerTextWriterFactory(kmerSize);
+                break;
+            case BINARY:
+            default:
+                kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
+                break;
+        }
+        logDebug("WriteOperator");
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), kmerWriter);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return writeKmerOperator;
+    }
+
+    public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+        ITupleWriterFactory nodeWriter = null;
+        switch (outputFormat) {
+            case TEXT:
+                nodeWriter = new NodeTextWriterFactory(kmerSize);
+                break;
+            case BINARY:
+            default:
+                nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
+                break;
+        }
+        logDebug("WriteOperator");
+        // Output Node
+        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), nodeWriter);
+        connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return writeNodeOperator;
+    }
+*/
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        // logDebug("Write kmer to result");
+        // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+//        logDebug("Map Kmer to Read Operator");
+//        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+//        logDebug("Group by Read Operator");
+//        lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+/*        logDebug("Generate final node");
+        lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+        logDebug("Write node to result");
+        lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);*/
+
+        jobSpec.addRoot(lastOperator);
+        return jobSpec;
+    }
+
+    protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
+        Configuration conf = confFactory.getConf();
+        readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
+        kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
+        if (kmerSize % 2 == 0) {
+            kmerSize--;
+            conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
+        }
+        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
+
+        bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
+
+        String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
+            groupbyType = GroupbyType.EXTERNAL;
+        } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
+            groupbyType = GroupbyType.PRECLUSTER;
+        } else {
+            groupbyType = GroupbyType.HYBRIDHASH;
+        }
+
+        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        if (output.equalsIgnoreCase("text")) {
+            outputFormat = OutputFormat.TEXT;
+        } else {
+            outputFormat = OutputFormat.BINARY;
+        }
+        try {
+            hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+            readSchedule = scheduler.getLocationConstraints(splits);
+        } catch (IOException ex) {
+            throw new HyracksDataException(ex);
+        }
+
+        LOG.info("Genomix Graph Build Configuration");
+        LOG.info("Kmer:" + kmerSize);
+        LOG.info("Groupby type:" + type);
+        LOG.info("Output format:" + output);
+        LOG.info("Frame limit" + frameLimits);
+        LOG.info("Frame kmerByteSize" + frameSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..f0f72b9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        final int frameSize = ctx.getFrameSize();
+        return new IAggregatorDescriptor() {
+
+            private PositionReference position = new PositionReference();
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState(new ArrayBackedValueStorage());
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                inputVal.reset();
+                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+                for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+                        1); offset += PositionReference.LENGTH) {
+                    position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+                    inputVal.append(position);
+                }
+                //make a fake feild to cheat caller
+                tupleBuilder.addFieldEndOffset();
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+                for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+                        1); offset += PositionReference.LENGTH) {
+                    position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+                    inputVal.append(position);
+                }
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("partial result method should not be called");
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                try {
+                    if (inputVal.getLength() > frameSize / 2) {
+                        LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
+                    }
+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+                    tupleBuilder.addFieldEndOffset();
+
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+        };
+
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..2579029
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.IntermediateNodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.genomix.type.ReadIDWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
+
+    public static final int OutputKmerField = 0;
+    public static final int OutputPosition = 1;
+
+    private final boolean bReversed;
+    private final int readLength;
+    private final int kmerSize;
+
+    public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+            null, null, null, null, null, null});
+
+    public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
+        bReversed = bGenerateReversed;
+        this.readLength = readlength;
+        this.kmerSize = k;
+    }
+
+    public static enum KmerDir {
+        FORWARD,
+        REVERSE,
+    }
+
+    @Override
+    public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputBuffer, true);
+
+        return new IKeyValueParser<LongWritable, Text>() {
+
+            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);//
+            private PositionReference pos = new PositionReference();//
+
+            private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
+            private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
+            private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
+            private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
+            private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
+            private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
+            private IntermediateNodeWritable outputNode = new IntermediateNodeWritable();
+            private ReadIDWritable readId = new ReadIDWritable();
+            private KmerListWritable kmerList = new KmerListWritable();
+
+            private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(kmerSize);
+            private KmerDir preKmerDir = KmerDir.FORWARD;
+            private KmerDir curKmerDir = KmerDir.FORWARD;
+            private KmerDir nextKmerDir = KmerDir.FORWARD;
+
+            byte mateId = (byte) 0;
+
+            @Override
+            public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+                String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
+                if (geneLine.length != 2) {
+                    return;
+                }
+                int readID = 0;
+                try {
+                    readID = Integer.parseInt(geneLine[0]);
+                } catch (NumberFormatException e) {
+                    LOG.warn("Invalid data ");
+                    return;
+                }
+
+                Pattern genePattern = Pattern.compile("[AGCT]+");
+                Matcher geneMatcher = genePattern.matcher(geneLine[1]);
+                boolean isValid = geneMatcher.matches();
+                if (isValid) {
+                    if (geneLine[1].length() != readLength) {
+                        LOG.warn("Invalid readlength at: " + readID);
+                        return;
+                    }
+                    SplitReads(readID, geneLine[1].getBytes(), writer);
+                }
+            }
+
+            private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
+/*                
+                if (kmerSize >= array.length) {
+                    return;
+                }
+                kmer.setByRead(array, 0);
+                InsertToFrame(kmer, readID, 1, writer);
+
+                
+                for (int i = kmerSize; i < array.length; i++) {
+                    kmer.shiftKmerWithNextChar(array[i]);
+                    InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
+                }
+
+                if (bReversed) {
+                    kmer.setByReadReverse(array, 0);
+                    InsertToFrame(kmer, readID, -1, writer);
+                    for (int i = kmerSize; i < array.length; i++) {
+                        kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
+                        InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
+                    }
+                }*/
+                ///////////////////////////////////////
+                if (kmerSize >= array.length) {
+                    return;
+                }
+                /** first kmer **/
+                curForwardKmer.setByRead(array, 0);
+                curReverseKmer.set(kmerFactory.reverse(curForwardKmer));
+                curKmerDir = curForwardKmer.compareTo(curReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+                setNextKmer(array[kmerSize]);
+                readId.set(mateId, readID);
+                outputNode.setreadId(readId);
+                setEdgeListForNextKmer();
+                switch (curKmerDir) {
+                    case FORWARD:
+                        InsertToFrame(curForwardKmer, outputNode, writer);
+                        break;
+                    case REVERSE:
+                        InsertToFrame(curReverseKmer, outputNode, writer);
+                        break;
+                }
+                /** middle kmer **/
+                for (int i = kmerSize + 1; i < array.length; i++) {
+                    setPreKmerByOldCurKmer();
+                    setCurKmerByOldNextKmer();
+                    setNextKmer(array[i]);
+                    //set value.readId
+                    readId.set(mateId, readID);
+                    outputNode.setreadId(readId);
+                    //set value.edgeList
+                    setEdgeListForPreKmer();
+                    setEdgeListForNextKmer();
+                    //output mapper result
+                    switch (curKmerDir) {
+                        case FORWARD:
+                            InsertToFrame(curForwardKmer, outputNode, writer);
+                            break;
+                        case REVERSE:
+                            InsertToFrame(curReverseKmer, outputNode, writer);
+                            break;
+                    }
+                }
+                /** last kmer **/
+                setPreKmerByOldCurKmer();
+                setCurKmerByOldNextKmer();
+                //set value.readId
+                readId.set(mateId, readID);
+                outputNode.setreadId(readId);
+                //set value.edgeList
+                setEdgeListForPreKmer();
+                //output mapper result
+                switch (curKmerDir) {
+                    case FORWARD:
+                        InsertToFrame(curForwardKmer, outputNode, writer);
+                        break;
+                    case REVERSE:
+                        InsertToFrame(curReverseKmer, outputNode, writer);
+                        break;
+                }
+            }
+
+            public void setPreKmer(byte preChar){
+                preForwardKmer.set(curForwardKmer);
+                preForwardKmer.shiftKmerWithPreChar(preChar);
+                preReverseKmer.set(preForwardKmer);
+                preReverseKmer.set(kmerFactory.reverse(nextForwardKmer));
+                preKmerDir = preForwardKmer.compareTo(preReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+            }
+            
+            public void setNextKmer(byte nextChar) {
+                nextForwardKmer.set(curForwardKmer);
+                nextForwardKmer.shiftKmerWithNextChar(nextChar);
+                nextReverseKmer.set(nextForwardKmer);
+                nextReverseKmer.set(kmerFactory.reverse(nextForwardKmer));
+                nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+            }
+
+            public void setPreKmerByOldCurKmer() {
+                preKmerDir = curKmerDir;
+                preForwardKmer.set(curForwardKmer);
+                preReverseKmer.set(curReverseKmer);
+            }
+
+            //old nextKmer becomes current curKmer
+            public void setCurKmerByOldNextKmer() {
+                curKmerDir = nextKmerDir;
+                curForwardKmer.set(nextForwardKmer);
+                preReverseKmer.set(nextReverseKmer);
+            }
+
+            public void setEdgeListForNextKmer() {
+                switch (curKmerDir) {
+                    case FORWARD:
+                        switch (nextKmerDir) {
+                            case FORWARD:
+                                kmerList.reset();
+                                kmerList.append(nextForwardKmer);
+                                outputNode.setFFList(kmerList);
+                                break;
+                            case REVERSE:
+                                kmerList.reset();
+                                kmerList.append(nextReverseKmer);
+                                outputNode.setFRList(kmerList);
+                                break;
+                        }
+                        break;
+                    case REVERSE:
+                        switch (nextKmerDir) {
+                            case FORWARD:
+                                kmerList.reset();
+                                kmerList.append(nextForwardKmer);
+                                outputNode.setRFList(kmerList);
+                                break;
+                            case REVERSE:
+                                kmerList.reset();
+                                kmerList.append(nextReverseKmer);
+                                outputNode.setRRList(kmerList);
+                                break;
+                        }
+                        break;
+                }
+            }
+
+            public void setEdgeListForPreKmer() {
+                switch (curKmerDir) {
+                    case FORWARD:
+                        switch (preKmerDir) {
+                            case FORWARD:
+                                kmerList.reset();
+                                kmerList.append(preForwardKmer);
+                                outputNode.setRRList(kmerList);
+                                break;
+                            case REVERSE:
+                                kmerList.reset();
+                                kmerList.append(preReverseKmer);
+                                outputNode.setRFList(kmerList);
+                                break;
+                        }
+                        break;
+                    case REVERSE:
+                        switch (preKmerDir) {
+                            case FORWARD:
+                                kmerList.reset();
+                                kmerList.append(nextForwardKmer);
+                                outputNode.setFRList(kmerList);
+                                break;
+                            case REVERSE:
+                                kmerList.reset();
+                                kmerList.append(nextReverseKmer);
+                                outputNode.setFFList(kmerList);
+                                break;
+                        }
+                        break;
+                }
+            }
+
+            private void InsertToFrame(KmerBytesWritable kmer, IntermediateNodeWritable Node, IFrameWriter writer) {
+                try {
+                    tupleBuilder.reset();
+                    tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
+                    
+                    tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
+                    
+                    tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
+                            .getLength());
+                    tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
+                            .getLength());
+                    tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
+                            .getLength());
+                    tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
+                            .getLength());
+
+                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputBuffer, writer);
+                        outputAppender.reset(outputBuffer, true);
+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                tupleBuilder.getSize())) {
+                            throw new IllegalStateException(
+                                    "Failed to copy an record into a frame: the record kmerByteSize is too large.");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+
+            @Override
+            public void open(IFrameWriter writer) throws HyracksDataException {
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+        };
+    }
+
+}