Merge branch 'jianfeng/genomix' into jianfeng/genomix-reverse

Conflicts:
	genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
	genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
	genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index c0b00a7..65931a2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -13,21 +13,22 @@
      */
     private static final long serialVersionUID = 1L;
     private PositionWritable nodeID;
-    private PositionListWritable incomingList;
-    private PositionListWritable outgoingList;
+    private PositionListWritable forwardForwardList;
+    private PositionListWritable forwardReverseList;
+    private PositionListWritable reverseForwardList;
+    private PositionListWritable reverseReverseList;
     private KmerBytesWritable kmer;
 
     public NodeWritable() {
-        nodeID = new PositionWritable();
-        incomingList = new PositionListWritable();
-        outgoingList = new PositionListWritable();
-        kmer = new KmerBytesWritable();
+        this(21);
     }
 
     public NodeWritable(int kmerSize) {
-        nodeID = new PositionWritable();
-        incomingList = new PositionListWritable();
-        outgoingList = new PositionListWritable();
+        nodeID = new PositionWritable(0,(byte) 0);
+        forwardForwardList = new PositionListWritable();
+        forwardReverseList = new PositionListWritable();
+        reverseForwardList = new PositionListWritable();
+        reverseReverseList = new PositionListWritable();
         kmer = new KmerBytesWritable(kmerSize);
     }
 
@@ -39,32 +40,33 @@
         nodeID.set(readID, posInRead);
     }
 
-    public void setIncomingList(PositionListWritable incoming) {
-        incomingList.set(incoming);
-    }
-
-
-    public void setOutgoingList(PositionListWritable outgoing) {
-        outgoingList.set(outgoing);
-    }
-
     public void setKmer(KmerBytesWritable right) {
         this.kmer.set(right);
     }
     
     public void reset(int kmerSize) {
         nodeID.set(0, (byte) 0);
-        incomingList.reset();
-        outgoingList.reset();
+        forwardForwardList.reset();
+        forwardReverseList.reset();
+        reverseForwardList.reset();
+        reverseReverseList.reset();
         kmer.reset(kmerSize);
     }
 
-    public PositionListWritable getIncomingList() {
-        return incomingList;
+    public PositionListWritable getFFList() {
+        return forwardForwardList;
+    }
+    
+    public PositionListWritable getFRList() {
+        return forwardReverseList;
     }
 
-    public PositionListWritable getOutgoingList() {
-        return outgoingList;
+    public PositionListWritable getRFList() {
+        return reverseForwardList;
+    }
+    
+    public PositionListWritable getRRList() {
+        return reverseReverseList;
     }
 
     public PositionWritable getNodeID() {
@@ -79,36 +81,44 @@
         return kmer.getKmerLength();
     }
 
-    public void mergeNext(NodeWritable nextNode, int initialKmerSize) {
-        this.outgoingList.set(nextNode.outgoingList);
+    public void mergeForwadNext(NodeWritable nextNode, int initialKmerSize) {
+        this.forwardForwardList.set(nextNode.forwardForwardList);
+        this.forwardReverseList.set(nextNode.forwardReverseList);
         kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
     }
     
-    public void mergePre(NodeWritable preNode, int initialKmerSize){
-        this.incomingList.set(preNode.incomingList);
+    public void mergeForwardPre(NodeWritable preNode, int initialKmerSize){
+        this.reverseForwardList.set(preNode.reverseForwardList);
+        this.reverseReverseList.set(preNode.reverseReverseList);
         kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
     }
 
     public void set(NodeWritable node) {
         this.nodeID.set(node.getNodeID().getReadID(), node.getNodeID().getPosInRead());
-        this.incomingList.set(node.getIncomingList());
-        this.outgoingList.set(node.getOutgoingList());
+        this.forwardForwardList.set(node.forwardForwardList);
+        this.forwardReverseList.set(node.forwardReverseList);
+        this.reverseForwardList.set(node.reverseForwardList);
+        this.reverseReverseList.set(node.reverseReverseList);
         this.kmer.set(node.kmer);
     }
 
     @Override
     public void readFields(DataInput in) throws IOException {
         this.nodeID.readFields(in);
-        this.incomingList.readFields(in);
-        this.outgoingList.readFields(in);
+        this.forwardForwardList.readFields(in);
+        this.forwardReverseList.readFields(in);
+        this.reverseForwardList.readFields(in);
+        this.reverseReverseList.readFields(in);
         this.kmer.readFields(in);
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
         this.nodeID.write(out);
-        this.incomingList.write(out);
-        this.outgoingList.write(out);
+        this.forwardForwardList.write(out);
+        this.forwardReverseList.write(out);
+        this.reverseForwardList.write(out);
+        this.reverseReverseList.write(out);
         this.kmer.write(out);
     }
 
@@ -127,17 +137,27 @@
         StringBuilder sbuilder = new StringBuilder();
         sbuilder.append('(');
         sbuilder.append(nodeID.toString()).append('\t');
-        sbuilder.append(incomingList.toString()).append('\t');
-        sbuilder.append(outgoingList.toString()).append('\t');
+        sbuilder.append(forwardForwardList.toString()).append('\t');
+        sbuilder.append(forwardReverseList.toString()).append('\t');
+        sbuilder.append(reverseForwardList.toString()).append('\t');
+        sbuilder.append(reverseReverseList.toString()).append('\t');
         sbuilder.append(kmer.toString()).append(')');
         return sbuilder.toString();
     }
     
+    public int inDegree(){
+        return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+    }
+    
+    public int outDegree(){
+        return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+    }
+    
     /*
      * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1 
      */
     public boolean isPathNode() {
-        return incomingList.getCountOfPosition() == 1 && outgoingList.getCountOfPosition() == 1;
+        return inDegree() == 1 && outDegree() == 1;
     }
 
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
deleted file mode 100644
index 55c6ca7..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-public class NoteWritableFactory {
-    private NodeWritable node;
-    private KmerBytesWritableFactory kmerBytesWritableFactory;
-    private int kmerSize = 55;
-
-    public NoteWritableFactory() {
-        node = new NodeWritable();
-        kmerBytesWritableFactory = new KmerBytesWritableFactory(kmerSize);
-    }
-
-    public NodeWritable append(final NodeWritable orignalNode, final KmerBytesWritable appendKmer){
-        KmerBytesWritable preKmer = orignalNode.getKmer();
-        node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(preKmer,appendKmer));
-        return node;
-    }
-    
-    public NodeWritable append(final NodeWritable orignalNode, final NodeWritable appendNode) {
-        KmerBytesWritable nextKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, appendNode.getKmer().kmerlength - kmerSize + 2,
-                appendNode.getKmer());
-        return append(orignalNode, nextKmer);
-    }
-
-    public NodeWritable prepend(final NodeWritable orignalNode, final KmerBytesWritable prependKmer){
-        KmerBytesWritable nextKmer = orignalNode.getKmer();
-        node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(prependKmer,nextKmer));
-        return node;
-    }
-    
-    public NodeWritable prepend(final NodeWritable orignalNode, final NodeWritable prependNode) {
-        KmerBytesWritable prependKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, orignalNode.getKmer().kmerlength - kmerSize + 2,
-                orignalNode.getKmer());
-        return prepend(orignalNode, prependKmer);
-    }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index f77c844..10821b0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -42,7 +42,7 @@
     public void set(PositionWritable pos) {
         set(pos.getReadID(), pos.getPosInRead());
     }
-    
+
     public void set(int readID, byte posInRead) {
         Marshal.putInt(readID, storage, offset);
         storage[offset + INTBYTES] = posInRead;
@@ -67,11 +67,11 @@
     public int getLength() {
         return LENGTH;
     }
-    
-    public boolean isSameReadID(PositionWritable other){
+
+    public boolean isSameReadID(PositionWritable other) {
         return getReadID() == other.getReadID();
     }
-    
+
     @Override
     public void readFields(DataInput in) throws IOException {
         in.readFully(storage, offset, LENGTH);
@@ -138,7 +138,7 @@
             return diff;
         }
     }
-    
+
     static { // register this comparator
         WritableComparator.define(PositionWritable.class, new Comparator());
     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 0b0dcf2..f98c684 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -13,5 +13,7 @@
         super(kmerSize);
         // TODO Auto-generated constructor stub
     }
+
+
  
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 53bd79d..f625143 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -24,11 +24,15 @@
 
 public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
 
-    public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+    public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc, int readlength,
+            int kmerSize) {
         super(spec, 1, 1);
         recordDescriptors[0] = recDesc;
+        LAST_POSID = readlength - kmerSize + 1;
     }
 
+    private final int LAST_POSID;
+
     private static final long serialVersionUID = 1L;
     public static final int InputKmerField = 0;
     public static final int InputPosListField = 1;
@@ -93,6 +97,10 @@
             }
         }
 
+        private boolean isStart(byte posInRead) {
+            return posInRead == 1 || posInRead == -LAST_POSID;
+        }
+
         private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
                 ArrayBackedValueStorage noneZeroPositionCollection2) {
             zeroPositionCollection2.reset();
@@ -102,7 +110,7 @@
                     + accessor.getFieldStartOffset(tIndex, InputPosListField);
             for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
                 positionEntry.setNewReference(data, offsetPoslist + i);
-                if (positionEntry.getPosInRead() == 0) {
+                if (isStart(positionEntry.getPosInRead())) {
                     zeroPositionCollection2.append(positionEntry);
                 } else {
                     noneZeroPositionCollection2.append(positionEntry);
@@ -118,7 +126,7 @@
                     + accessor.getFieldStartOffset(tIndex, InputPosListField);
             for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
                 positionEntry.setNewReference(data, offsetPoslist + i);
-                if (positionEntry.getPosInRead() != 0) {
+                if (!isStart(positionEntry.getPosInRead())) {
                     appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
                 } else {
                     appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
@@ -137,11 +145,16 @@
                     writePosToFieldAndSkipSameReadID(pos, builder2.getDataOutput(), posList2);
                     builder2.addFieldEndOffset();
                 }
-                // set kmer, may not useful
-                byte[] data = accessor.getBuffer().array();
-                int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, InputKmerField);
-                builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+                // set kmer, may not useful,
+                // the reversed ID don't need to output the kmer
+                if (pos.getPosInRead() > 0) {
+                    byte[] data = accessor.getBuffer().array();
+                    int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                            + accessor.getFieldStartOffset(tIndex, InputKmerField);
+                    builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+                } else {
+                    builder2.addFieldEndOffset();
+                }
 
                 if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
                     FrameUtils.flushFrame(writeBuffer, writer);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index d80ed13..a775d7e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -40,12 +40,13 @@
 
     public static final int OutputNodeIDField = 0;
     public static final int OutputCountOfKmerField = 1;
-    public static final int OutputIncomingField = 2;
-    public static final int OutputOutgoingField = 3;
-    public static final int OutputKmerBytesField = 4;
+    public static final int OutputForwardForwardField = 2;
+    public static final int OutputForwardReverseField = 3;
+    public static final int OutputReverseForwardField = 4;
+    public static final int OutputReverseReverseField = 5;
+    public static final int OutputKmerBytesField = 6;
 
-    public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
-            null, null, null, null });
+    public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[7]);
 
     /**
      * (ReadID, Storage[posInRead]={len, PositionList, len, Kmer})
@@ -57,6 +58,8 @@
         private final RecordDescriptor inputRecDesc;
         private final RecordDescriptor outputRecDesc;
 
+        private final int LAST_POSITION_ID;
+
         private FrameTupleAccessor accessor;
         private ByteBuffer writeBuffer;
         private ArrayTupleBuilder builder;
@@ -66,6 +69,8 @@
         private NodeReference nextNodeEntry;
         private NodeReference nextNextNodeEntry;
 
+        private PositionListWritable cachePositionList;
+
         public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
                 RecordDescriptor outputRecDesc) {
             this.ctx = ctx;
@@ -74,6 +79,8 @@
             curNodeEntry = new NodeReference(kmerSize);
             nextNodeEntry = new NodeReference(kmerSize);
             nextNextNodeEntry = new NodeReference(0);
+            cachePositionList = new PositionListWritable();
+            LAST_POSITION_ID = (inputRecDesc.getFieldCount() - InputInfoFieldStart) / 2;
         }
 
         @Override
@@ -100,113 +107,158 @@
             int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
             int readID = accessor.getBuffer().getInt(
                     offsetPoslist + accessor.getFieldStartOffset(tIndex, InputReadIDField));
-            resetNode(curNodeEntry, readID, (byte) 0,
-                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), true);
+            if ((accessor.getFieldCount() - InputInfoFieldStart) % 2 != 0) {
+                throw new IllegalArgumentException("field length is odd");
+            }
 
-            for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
-                resetNode(nextNodeEntry, readID, (byte) (i - InputInfoFieldStart),
-                        offsetPoslist + accessor.getFieldStartOffset(tIndex, i), true);
-                NodeReference pNextNext = null;
-                if (i + 1 < accessor.getFieldCount()) {
-                    resetNode(nextNextNodeEntry, readID, (byte) (i - InputInfoFieldStart + 1),
-                            offsetPoslist + accessor.getFieldStartOffset(tIndex, i + 1), false);
-                    pNextNext = nextNextNodeEntry;
-                }
+            resetNode(curNodeEntry, readID, (byte) (1));
+            setForwardIncomingList(curNodeEntry,
+                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart));
+            setKmer(curNodeEntry.getKmer(), offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart));
+            if (curNodeEntry.getNodeID().getPosInRead() == LAST_POSITION_ID) {
+                setReverseIncomingList(curNodeEntry,
+                        offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart + 1));
+            }
 
-                if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
-                    if (pNextNext == null || pNextNext.getOutgoingList().getCountOfPosition() == 0) {
-                        curNodeEntry.mergeNext(nextNodeEntry, kmerSize);
-                    } else {
-                        curNodeEntry.getOutgoingList().reset();
-                        curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
-                        outputNode(curNodeEntry);
+            // next Node
+            readNodesInfo(tIndex, readID, curNodeEntry, nextNodeEntry, InputInfoFieldStart);
 
-                        nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
-                        curNodeEntry.set(nextNodeEntry);
-                    }
-                } else { // nextNode entry outgoing > 0
-                    curNodeEntry.getOutgoingList().set(nextNodeEntry.getOutgoingList());
-                    curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
-                    nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+            for (int i = InputInfoFieldStart + 2; i < accessor.getFieldCount(); i += 2) {
+                readNodesInfo(tIndex, readID, nextNodeEntry, nextNextNodeEntry, i);
+
+                if (curNodeEntry.inDegree() > 1 || curNodeEntry.outDegree() > 0 || nextNodeEntry.inDegree() > 0
+                        || nextNodeEntry.outDegree() > 0 || nextNextNodeEntry.inDegree() > 0
+                        || nextNextNodeEntry.outDegree() > 0) {
+                    connect(curNodeEntry, nextNodeEntry);
                     outputNode(curNodeEntry);
                     curNodeEntry.set(nextNodeEntry);
-                    curNodeEntry.getOutgoingList().reset();
+                    nextNodeEntry.set(nextNextNodeEntry);
+                    continue;
                 }
+                curNodeEntry.mergeForwadNext(nextNodeEntry, kmerSize);
+                nextNodeEntry.set(nextNextNodeEntry);
             }
             outputNode(curNodeEntry);
         }
 
-        /**
-         * The neighbor list is store in the next next node
-         * (3,2) [(1,0),(2,0)] will become the outgoing list of node (3,1)
-         * (3,1) [(1,0),(2,0)]
-         * 
-         * @param curNodeEntry2
-         * @param nextNodeEntry2
-         */
-        //        private void setOutgoingByNext(NodeReference curNodeEntry2, NodeReference nextNodeEntry2) {
-        //            if (nextNodeEntry2 == null) {
-        //                curNodeEntry2.getOutgoingList().reset();
-        //            } else {
-        //                curNodeEntry2.setOutgoingList(nextNodeEntry2.getOutgoingList());
-        //            }
-        //        }
-
-        private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean isInitial) {
-            node.reset(kmerSize);
-            node.setNodeID(readID, posInRead);
-
-            ByteBuffer buffer = accessor.getBuffer();
-            int lengthPos = buffer.getInt(offset);
-            int countPosition = PositionListWritable.getCountByDataLength(lengthPos);
-            offset += INT_LENGTH;
-            if (posInRead == 0) {
-                setPositionList(node.getIncomingList(), countPosition, buffer.array(), offset, true);
-                // minus 1 position of the incoming list to get the correct predecessor
-                for (PositionWritable pos : node.getIncomingList()) {
-                    if (pos.getPosInRead() == 0) {
-                        throw new IllegalArgumentException("The incoming position list contain invalid posInRead");
-                    }
-                    pos.set(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+        private void readNodesInfo(int tIndex, int readID, NodeReference curNode, NodeReference nextNode, int curFieldID) {
+            // nextNext node
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+            if (curFieldID + 2 < accessor.getFieldCount()) {
+                setForwardOutgoingList(curNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 2));
+                resetNode(nextNode, readID, (byte) (1 + (curFieldID + 2 - InputInfoFieldStart) / 2));
+                setKmer(nextNode.getKmer(), offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 2));
+                setReverseOutgoingList(nextNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 1));
+                if (nextNode.getNodeID().getPosInRead() == LAST_POSITION_ID) {
+                    setReverseIncomingList(nextNode,
+                            offsetPoslist + accessor.getFieldStartOffset(tIndex, curFieldID + 3));
                 }
             } else {
-                setPositionList(node.getOutgoingList(), countPosition, buffer.array(), offset, isInitial);
-            }
-            offset += lengthPos;
-            int lengthKmer = buffer.getInt(offset);
-            if (node.getKmer().getLength() != lengthKmer) {
-                throw new IllegalStateException("Size of Kmer is invalid ");
-            }
-            setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, isInitial);
-        }
-
-        private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean isInitial) {
-            if (isInitial) {
-                kmer.set(array, offset);
-            } else {
-                kmer.setNewReference(array, offset);
+                resetNode(nextNode, readID, (byte) 0);
             }
         }
 
-        private void setPositionList(PositionListWritable positionListWritable, int count, byte[] array, int offset,
-                boolean isInitial) {
-            if (isInitial) {
-                positionListWritable.set(count, array, offset);
-            } else {
-                positionListWritable.setNewReference(count, array, offset);
+        private void setKmer(KmerBytesWritable kmer, int offset) {
+            ByteBuffer buffer = accessor.getBuffer();
+            int length = buffer.getInt(offset);
+            offset += INT_LENGTH + length;
+            length = buffer.getInt(offset);
+            if (kmer.getLength() != length) {
+                throw new IllegalArgumentException("kmer size is invalid");
+            }
+            offset += INT_LENGTH;
+            kmer.set(buffer.array(), offset);
+        }
+
+        private void connect(NodeReference curNode, NodeReference nextNode) {
+            curNode.getFFList().append(nextNode.getNodeID());
+            nextNode.getRRList().append(curNode.getNodeID());
+        }
+
+        private void setCachList(int offset) {
+            ByteBuffer buffer = accessor.getBuffer();
+            int count = PositionListWritable.getCountByDataLength(buffer.getInt(offset));
+            cachePositionList.set(count, buffer.array(), offset + INT_LENGTH);
+        }
+
+        private void resetNode(NodeReference node, int readID, byte posInRead) {
+            node.reset(kmerSize);
+            node.setNodeID(readID, posInRead);
+        }
+
+        private void setReverseOutgoingList(NodeReference node, int offset) {
+            setCachList(offset);
+            for (PositionWritable pos : cachePositionList) {
+                if (pos.getPosInRead() > 0) {
+                    node.getRFList().append(pos);
+                } else {
+                    node.getRRList().append(pos.getReadID(), (byte) -pos.getPosInRead());
+                }
+            }
+        }
+
+        private void setReverseIncomingList(NodeReference node, int offset) {
+            setCachList(offset);
+            for (PositionWritable pos : cachePositionList) {
+                if (pos.getPosInRead() > 0) {
+                    if (pos.getPosInRead() > 1) {
+                        node.getFRList().append(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+                    } else {
+                        throw new IllegalArgumentException("Invalid position");
+                    }
+                } else {
+                    if (pos.getPosInRead() > -LAST_POSITION_ID) {
+                        node.getFFList().append(pos.getReadID(), (byte) -(pos.getPosInRead() - 1));
+                    }
+                }
+            }
+        }
+
+        private void setForwardOutgoingList(NodeReference node, int offset) {
+            setCachList(offset);
+            for (PositionWritable pos : cachePositionList) {
+                if (pos.getPosInRead() > 0) {
+                    node.getFFList().append(pos);
+                } else {
+                    node.getFRList().append(pos.getReadID(), (byte) -pos.getPosInRead());
+                }
+            }
+        }
+
+        private void setForwardIncomingList(NodeReference node, int offset) {
+            setCachList(offset);
+            for (PositionWritable pos : cachePositionList) {
+                if (pos.getPosInRead() > 0) {
+                    if (pos.getPosInRead() > 1) {
+                        node.getRRList().append(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+                    } else {
+                        throw new IllegalArgumentException("position id is invalid");
+                    }
+                } else {
+                    if (pos.getPosInRead() > -LAST_POSITION_ID) {
+                        node.getRFList().append(pos.getReadID(), (byte) -(pos.getPosInRead() - 1));
+                    }
+                }
             }
         }
 
         private void outputNode(NodeReference node) throws HyracksDataException {
+            if (node.getNodeID().getPosInRead() == 0) {
+                return;
+            }
             try {
                 builder.addField(node.getNodeID().getByteArray(), node.getNodeID().getStartOffset(), node.getNodeID()
                         .getLength());
                 builder.getDataOutput().writeInt(node.getCount());
                 builder.addFieldEndOffset();
-                builder.addField(node.getIncomingList().getByteArray(), node.getIncomingList().getStartOffset(), node
-                        .getIncomingList().getLength());
-                builder.addField(node.getOutgoingList().getByteArray(), node.getOutgoingList().getStartOffset(), node
-                        .getOutgoingList().getLength());
+                builder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList()
+                        .getLength());
+                builder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList()
+                        .getLength());
+                builder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList()
+                        .getLength());
+                builder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList()
+                        .getLength());
                 builder.addField(node.getKmer().getBytes(), node.getKmer().getOffset(), node.getKmer().getLength());
 
                 if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
@@ -240,7 +292,6 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        // TODO Auto-generated method stub
         return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
                 recordDescriptors[0]);
     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 855be64..fe8224f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -41,10 +41,10 @@
 public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
-    
+
     public static final int OutputKmerField = 0;
     public static final int OutputPosition = 1;
-    
+
     private KmerBytesWritable kmer;
     private PositionReference pos;
     private boolean bReversed;
@@ -79,7 +79,7 @@
                 try {
                     readID = Integer.parseInt(geneLine[0]);
                 } catch (NumberFormatException e) {
-                    LOG.warn("Invalid data " );
+                    LOG.warn("Invalid data ");
                     return;
                 }
 
@@ -87,8 +87,8 @@
                 Matcher geneMatcher = genePattern.matcher(geneLine[1]);
                 boolean isValid = geneMatcher.matches();
                 if (isValid) {
-                    if (geneLine[1].length() != readLength){
-                        LOG.warn("Invalid readlength at: " + readID );
+                    if (geneLine[1].length() != readLength) {
+                        LOG.warn("Invalid readlength at: " + readID);
                         return;
                     }
                     SplitReads(readID, geneLine[1].getBytes(), writer);
@@ -102,29 +102,29 @@
                     return;
                 }
                 kmer.setByRead(array, 0);
-                InsertToFrame(kmer, readID, 0, writer);
+                InsertToFrame(kmer, readID, 1, writer);
 
                 /** middle kmer */
                 for (int i = k; i < array.length; i++) {
                     kmer.shiftKmerWithNextChar(array[i]);
-                    InsertToFrame(kmer, readID, i - k + 1, writer);
+                    InsertToFrame(kmer, readID, i - k + 2, writer);
                 }
 
                 if (bReversed) {
                     /** first kmer */
                     kmer.setByReadReverse(array, 0);
-                    InsertToFrame(kmer, -readID, array.length - k, writer);
+                    InsertToFrame(kmer, readID, -1, writer);
                     /** middle kmer */
                     for (int i = k; i < array.length; i++) {
                         kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
-                        InsertToFrame(kmer, -readID, array.length - i - 1, writer);
+                        InsertToFrame(kmer, readID, -(i - k + 2), writer);
                     }
                 }
             }
 
             private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
                 try {
-                    if (posInRead > 127) {
+                    if (Math.abs(posInRead) > 127) {
                         throw new IllegalArgumentException("Position id is beyond 127 at " + readID);
                     }
                     tupleBuilder.reset();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index 57ba91a..d0fc24b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -75,7 +75,9 @@
                 try {
                     out.writeByte(posInRead);
                     writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
-                    writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                    if (posInRead > 0) {
+                        writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                    }
                 } catch (IOException e) {
                     throw new HyracksDataException("Failed to write into temporary storage");
                 }
@@ -108,7 +110,9 @@
                 try {
                     out.writeByte(posInRead);
                     writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
-                    writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                    if (posInRead > 0) {
+                        writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                    }
                 } catch (IOException e) {
                     throw new HyracksDataException("Failed to write into temporary storage");
                 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index f1ebdff..0d51035 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -48,17 +48,24 @@
         return new IAggregatorDescriptor() {
 
             class PositionArray {
-                public ArrayBackedValueStorage[] storages;
+                public ArrayBackedValueStorage[] forwardStorages;
+                public ArrayBackedValueStorage[] reverseStorages;
                 public int count;
 
-                public PositionArray(ArrayBackedValueStorage[] storages2, int i) {
-                    storages = storages2;
-                    count = i;
+                public PositionArray() {
+                    forwardStorages = new ArrayBackedValueStorage[ValidPosCount];
+                    reverseStorages = new ArrayBackedValueStorage[ValidPosCount];
+                    for (int i = 0; i < ValidPosCount; i++) {
+                        forwardStorages[i] = new ArrayBackedValueStorage();
+                        reverseStorages[i] = new ArrayBackedValueStorage();
+                    }
+                    count = 0;
                 }
 
                 public void reset() {
-                    for (ArrayBackedValueStorage each : storages) {
-                        each.reset();
+                    for (int i = 0; i < ValidPosCount; i++) {
+                        forwardStorages[i].reset();
+                        reverseStorages[i].reset();
                     }
                     count = 0;
                 }
@@ -66,11 +73,8 @@
 
             @Override
             public AggregateState createAggregateStates() {
-                ArrayBackedValueStorage[] storages = new ArrayBackedValueStorage[ValidPosCount];
-                for (int i = 0; i < storages.length; i++) {
-                    storages[i] = new ArrayBackedValueStorage();
-                }
-                return new AggregateState(new PositionArray(storages, 0));
+
+                return new AggregateState(new PositionArray());
             }
 
             @Override
@@ -82,29 +86,38 @@
                 pushIntoStorage(accessor, tIndex, positionArray);
 
                 // make fake fields
-                for (int i = 0; i < ValidPosCount; i++) {
+                for (int i = 0; i < ValidPosCount * 2; i++) {
                     tupleBuilder.addFieldEndOffset();
                 }
             }
 
             private void pushIntoStorage(IFrameTupleAccessor accessor, int tIndex, PositionArray positionArray)
                     throws HyracksDataException {
-                ArrayBackedValueStorage[] storages = positionArray.storages;
                 int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
                 int fieldOffset = leadbyte + accessor.getFieldStartOffset(tIndex, InputPositionListField);
                 ByteBuffer fieldBuffer = accessor.getBuffer();
 
                 while (fieldOffset < leadbyte + accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
                     byte posInRead = fieldBuffer.get(fieldOffset);
-                    if (storages[posInRead].getLength() > 0) {
+
+                    ArrayBackedValueStorage[] storage = positionArray.forwardStorages;
+                    boolean hasKmer = true;
+                    if (posInRead < 0) {
+                        storage = positionArray.reverseStorages;
+                        posInRead = (byte) -posInRead;
+                        hasKmer = false;
+                    }
+                    if (storage[posInRead - 1].getLength() > 0) {
                         throw new IllegalArgumentException("Reentering into an exist storage");
                     }
                     fieldOffset += BYTE_SIZE;
 
                     // read poslist
-                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    fieldOffset += writeBytesToStorage(storage[posInRead - 1], fieldBuffer, fieldOffset);
                     // read Kmer
-                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    if (hasKmer) {
+                        fieldOffset += writeBytesToStorage(storage[posInRead - 1], fieldBuffer, fieldOffset);
+                    }
 
                     positionArray.count += 1;
                 }
@@ -149,15 +162,21 @@
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 PositionArray positionArray = (PositionArray) state.state;
-                ArrayBackedValueStorage[] storages = positionArray.storages;
-                if (positionArray.count != storages.length) {
+
+                if (positionArray.count != ValidPosCount * 2) {
                     throw new IllegalStateException("Final aggregate position number is invalid");
                 }
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
                 try {
-                    for (int i = 0; i < storages.length; i++) {
-                        fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(),
-                                storages[i].getLength());
+                    for (int i = 0; i < ValidPosCount; i++) {
+                        fieldOutput.write(positionArray.forwardStorages[i].getByteArray(),
+                                positionArray.forwardStorages[i].getStartOffset(),
+                                positionArray.forwardStorages[i].getLength());
+                        tupleBuilder.addFieldEndOffset();
+
+                        fieldOutput.write(positionArray.reverseStorages[i].getByteArray(),
+                                positionArray.reverseStorages[i].getStartOffset(),
+                                positionArray.reverseStorages[i].getLength());
                         tupleBuilder.addFieldEndOffset();
                     }
                 } catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 7c70aa9..00409ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -32,8 +32,11 @@
 
     public static final int InputNodeIDField = MapReadToNodeOperator.OutputNodeIDField;
     public static final int InputCountOfKmerField = MapReadToNodeOperator.OutputCountOfKmerField;
-    public static final int InputIncomingField = MapReadToNodeOperator.OutputIncomingField;
-    public static final int InputOutgoingField = MapReadToNodeOperator.OutputOutgoingField;
+    public static final int InputFFField = MapReadToNodeOperator.OutputForwardForwardField;
+    public static final int InputFRField = MapReadToNodeOperator.OutputForwardReverseField;
+    public static final int InputRFField = MapReadToNodeOperator.OutputReverseForwardField;
+    public static final int InputRRField = MapReadToNodeOperator.OutputReverseReverseField;
+    
     public static final int InputKmerBytesField = MapReadToNodeOperator.OutputKmerBytesField;
 
     private ConfFactory confFactory;
@@ -68,11 +71,15 @@
         public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
             node.getNodeID().setNewReference(tuple.getFieldData(InputNodeIDField),
                     tuple.getFieldStart(InputNodeIDField));
-            node.getIncomingList().setNewReference(tuple.getFieldLength(InputIncomingField) / PositionWritable.LENGTH,
-                    tuple.getFieldData(InputIncomingField), tuple.getFieldStart(InputIncomingField));
-            node.getOutgoingList().setNewReference(tuple.getFieldLength(InputOutgoingField) / PositionWritable.LENGTH,
-                    tuple.getFieldData(InputOutgoingField), tuple.getFieldStart(InputOutgoingField));
-
+            node.getFFList().setNewReference(tuple.getFieldLength(InputFFField) / PositionWritable.LENGTH,
+                    tuple.getFieldData(InputFFField), tuple.getFieldStart(InputFFField));
+            node.getFRList().setNewReference(tuple.getFieldLength(InputFRField) / PositionWritable.LENGTH,
+                    tuple.getFieldData(InputFRField), tuple.getFieldStart(InputFRField));
+            node.getRFList().setNewReference(tuple.getFieldLength(InputRFField) / PositionWritable.LENGTH,
+                    tuple.getFieldData(InputRFField), tuple.getFieldStart(InputRFField));
+            node.getRRList().setNewReference(tuple.getFieldLength(InputRRField) / PositionWritable.LENGTH,
+                    tuple.getFieldData(InputRRField), tuple.getFieldStart(InputRRField));
+            
             node.getKmer().setNewReference(
                     Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
                             tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index c24760f..cec702e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -5,7 +5,7 @@
 
 import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -39,16 +39,22 @@
             public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
                 node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
                         tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
-                node.getIncomingList().setNewReference(
-                        PositionListWritable.getCountByDataLength(tuple
-                                .getFieldLength(NodeSequenceWriterFactory.InputIncomingField)),
-                        tuple.getFieldData(NodeSequenceWriterFactory.InputIncomingField),
-                        tuple.getFieldStart(NodeSequenceWriterFactory.InputIncomingField));
-                node.getOutgoingList().setNewReference(
-                        PositionListWritable.getCountByDataLength(tuple
-                                .getFieldLength(NodeSequenceWriterFactory.InputOutgoingField)),
-                        tuple.getFieldData(NodeSequenceWriterFactory.InputOutgoingField),
-                        tuple.getFieldStart(NodeSequenceWriterFactory.InputOutgoingField));
+                node.getFFList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputFFField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputFFField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputFFField));
+                node.getFRList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputFRField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputFRField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputFRField));
+                node.getRFList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputRFField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputRFField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputRFField));
+                node.getRRList().setNewReference(
+                        tuple.getFieldLength(NodeSequenceWriterFactory.InputRRField) / PositionWritable.LENGTH,
+                        tuple.getFieldData(NodeSequenceWriterFactory.InputRRField),
+                        tuple.getFieldStart(NodeSequenceWriterFactory.InputRRField));
 
                 node.getKmer().setNewReference(
                         Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
index fe9afdf..2d420c3 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -59,7 +59,7 @@
     public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
     public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
 
-    public static final boolean DEFAULT_REVERSED = false;
+    public static final boolean DEFAULT_REVERSED = true;
 
     public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
     public static final String JOB_PLAN_GRAPHSTAT  = "graphstat";
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 02d9f9c..94d619a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -220,7 +220,7 @@
         // (ReadID,PosInRead,{OtherPosition,...},Kmer)
 
         AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
-                MapKmerPositionToReadOperator.readIDOutputRec);
+                MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
         connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
         return mapKmerToRead;
@@ -237,7 +237,7 @@
                 jobSpec));
 
         RecordDescriptor readIDFinalRec = new RecordDescriptor(
-                new ISerializerDeserializer[1 + MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+                new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
         Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
                 new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
                 new ReadIDNormarlizedComputeFactory(), ReadIDPointable.FACTORY,
@@ -283,7 +283,6 @@
                 hadoopJobConfFactory.getConf(), kmerWriter);
         connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
-        // jobSpec.addRoot(writeKmerOperator);
         return writeKmerOperator;
     }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index fb0a3de..201be03 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -99,19 +99,27 @@
                                             plist.setNewReference(posCount, buffer, fieldOffset);
                                             fieldOffset += plist.getLength();
 
-                                            int kmerbytes = Marshal.getInt(buffer, fieldOffset);
-                                            if (kmer.getLength() != kmerbytes) {
-                                                throw new IllegalArgumentException("kmerlength is invalid");
+                                            int posInRead = (i + 1) / 2;
+                                            if (i % 2 == 0) {
+                                                posInRead = -posInRead;
                                             }
-                                            fieldOffset += 4;
-                                            kmer.setNewReference(buffer, fieldOffset);
-                                            fieldOffset += kmer.getLength();
+                                            String kmerString = "";
+                                            if (posInRead > 0) {
+                                                int kmerbytes = Marshal.getInt(buffer, fieldOffset);
+                                                if (kmer.getLength() != kmerbytes) {
+                                                    throw new IllegalArgumentException("kmerlength is invalid");
+                                                }
+                                                fieldOffset += 4;
+                                                kmer.setNewReference(buffer, fieldOffset);
+                                                fieldOffset += kmer.getLength();
+                                                kmerString = kmer.toString();
+                                            }
 
-                                            output.write(Integer.toString(i - 1).getBytes());
+                                            output.write(Integer.toString(posInRead).getBytes());
                                             output.writeByte('\t');
                                             output.write(plist.toString().getBytes());
                                             output.writeByte('\t');
-                                            output.write(kmer.toString().getBytes());
+                                            output.write(kmerString.getBytes());
                                             output.writeByte('\t');
                                         }
                                     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index c399603..968123e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -76,13 +76,17 @@
                                             tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
                                             tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
 
-                                    if (kmer.getLength() > tuple
-                                            .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
-                                        throw new IllegalArgumentException("Not enough kmer bytes");
+                                    String kmerString = "";
+                                    if (posInRead > 0) {
+                                        if (kmer.getLength() > tuple
+                                                .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+                                            throw new IllegalArgumentException("Not enough kmer bytes");
+                                        }
+                                        kmer.setNewReference(
+                                                tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
+                                                tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
+                                        kmerString = kmer.toString();
                                     }
-                                    kmer.setNewReference(
-                                            tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
-                                            tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
 
                                     output.write(Integer.toString(readID).getBytes());
                                     output.writeByte('\t');
@@ -90,7 +94,7 @@
                                     output.writeByte('\t');
                                     output.write(plist.toString().getBytes());
                                     output.writeByte('\t');
-                                    output.write(kmer.toString().getBytes());
+                                    output.write(kmerString.getBytes());
                                     output.writeByte('\n');
                                 } catch (IOException e) {
                                     throw new HyracksDataException(e);
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index df25fd7..07cfab4 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -94,7 +94,7 @@
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, new int [] {2,5,8,11}));
+        Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, new int [] {2,5,8,11,14,17,20,23}));
     }
 
     public void TestEndToEnd() throws Exception {
@@ -103,7 +103,7 @@
         cleanUpReEntry();
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_NODE, new int[] {1,2}));
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_NODE, new int[] {1,2,3,4}));
     }
 
     @Before
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index 13190dd..01c49e5 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -1,6 +1,6 @@
 1	AATAGAAG

-2	AATAGAAG

+2	AATAGCTT

 3	AATAGAAG

-4	AATAGAAG

+4	AATAGCTT

 5	AATAGAAG

 6	AGAAGAAG

diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
index 2988303..9334b95 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
@@ -1,16 +1,14 @@
-((1,0)	[]	[(1,2)]	AATAGA)
-((1,2)	[(1,0)]	[(6,0),(1,3)]	TAGAA)
-((1,3)	[(1,2)]	[]	AGAAG)
-((2,0)	[]	[(2,2)]	AATAGA)
-((2,2)	[(2,0)]	[(6,0),(2,3)]	TAGAA)
-((2,3)	[(2,2)]	[]	AGAAG)
-((3,0)	[]	[(3,2)]	AATAGA)
-((3,2)	[(3,0)]	[(6,0),(3,3)]	TAGAA)
-((3,3)	[(3,2)]	[]	AGAAG)
-((4,0)	[]	[(4,2)]	AATAGA)
-((4,2)	[(4,0)]	[(6,0),(4,3)]	TAGAA)
-((4,3)	[(4,2)]	[]	AGAAG)
-((5,0)	[]	[(5,2)]	AATAGA)
-((5,2)	[(5,0)]	[(6,0),(5,3)]	TAGAA)
-((5,3)	[(5,2)]	[]	AGAAG)
-((6,0)	[(1,2),(2,2),(3,2),(5,2),(4,2)]	[]	AGAAGAAG)
+((1,1)	[(1,3)]	[]	[]	[]	AATAGA)
+((1,3)	[(6,1),(1,4)]	[]	[]	[(1,1)]	TAGAA)
+((1,4)	[(6,2)]	[]	[]	[(1,3)]	AGAAG)
+((2,1)	[]	[]	[]	[]	AATAGCTT)
+((3,1)	[(3,3)]	[]	[]	[]	AATAGA)
+((3,3)	[(6,1),(3,4)]	[]	[]	[(3,1)]	TAGAA)
+((3,4)	[(6,2)]	[]	[]	[(3,3)]	AGAAG)
+((4,1)	[]	[]	[]	[]	AATAGCTT)
+((5,1)	[(5,3)]	[]	[]	[]	AATAGA)
+((5,3)	[(6,1),(5,4)]	[]	[]	[(5,1)]	TAGAA)
+((5,4)	[(6,2)]	[]	[]	[(5,3)]	AGAAG)
+((6,1)	[(6,2)]	[]	[]	[(5,3),(3,3),(1,3)]	AGAAG)
+((6,2)	[(6,3)]	[]	[]	[(1,4),(3,4),(5,4),(6,1)]	GAAGA)
+((6,3)	[]	[]	[]	[(6,2)]	AAGAAG)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
index 1091d2e..3502e95 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
@@ -1,24 +1,48 @@
-AAGAA	(6,2)
-AATAG	(1,0)
-AATAG	(2,0)
-AATAG	(3,0)
-AATAG	(4,0)
-AATAG	(5,0)
-AGAAG	(1,3)
-AGAAG	(2,3)
-AGAAG	(3,3)
-AGAAG	(4,3)
-AGAAG	(5,3)
-AGAAG	(6,0)
-AGAAG	(6,3)
-ATAGA	(1,1)
-ATAGA	(2,1)
-ATAGA	(3,1)
-ATAGA	(4,1)
-ATAGA	(5,1)
-GAAGA	(6,1)
-TAGAA	(1,2)
-TAGAA	(2,2)
-TAGAA	(3,2)
-TAGAA	(4,2)
-TAGAA	(5,2)
+AAGAA	(6,3)
+AAGCT	(2,-4)
+AAGCT	(4,-4)
+AATAG	(1,1)
+AATAG	(2,1)
+AATAG	(3,1)
+AATAG	(4,1)
+AATAG	(5,1)
+AGAAG	(1,4)
+AGAAG	(3,4)
+AGAAG	(5,4)
+AGAAG	(6,1)
+AGAAG	(6,4)
+AGCTA	(2,-3)
+AGCTA	(4,-3)
+AGCTT	(2,4)
+AGCTT	(4,4)
+ATAGA	(1,2)
+ATAGA	(3,2)
+ATAGA	(5,2)
+ATAGC	(2,2)
+ATAGC	(4,2)
+CTATT	(1,-1)
+CTATT	(2,-1)
+CTATT	(3,-1)
+CTATT	(4,-1)
+CTATT	(5,-1)
+CTTCT	(1,-4)
+CTTCT	(3,-4)
+CTTCT	(5,-4)
+CTTCT	(6,-1)
+CTTCT	(6,-4)
+GAAGA	(6,2)
+GCTAT	(2,-2)
+GCTAT	(4,-2)
+TAGAA	(1,3)
+TAGAA	(3,3)
+TAGAA	(5,3)
+TAGCT	(2,3)
+TAGCT	(4,3)
+TCTAT	(1,-2)
+TCTAT	(3,-2)
+TCTAT	(5,-2)
+TCTTC	(6,-2)
+TTCTA	(1,-3)
+TTCTA	(3,-3)
+TTCTA	(5,-3)
+TTCTT	(6,-3)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
index 2585102..b52a848 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
@@ -1,24 +1,48 @@
-1	0	[]	AATAG
-1	1	[]	ATAGA
-1	2	[]	TAGAA
-1	3	[(6,0)]	AGAAG
-2	0	[]	AATAG
-2	1	[]	ATAGA
-2	2	[]	TAGAA
-2	3	[(6,0)]	AGAAG
-3	0	[]	AATAG
-3	1	[]	ATAGA
-3	2	[]	TAGAA
-3	3	[(6,0)]	AGAAG
-4	0	[]	AATAG
-4	1	[]	ATAGA
-4	2	[]	TAGAA
-4	3	[(6,0)]	AGAAG
-5	0	[]	AATAG
-5	1	[]	ATAGA
-5	2	[]	TAGAA
-5	3	[(6,0)]	AGAAG
-6	0	[(1,3),(2,3),(3,3),(5,3),(4,3)]	AGAAG
-6	1	[]	GAAGA
-6	2	[]	AAGAA
-6	3	[]	AGAAG
+1	-1	[]	
+1	-2	[]	
+1	-3	[]
+1	-4	[(6,-1)]	
+1	1	[]	AATAG
+1	2	[]	ATAGA
+1	3	[]	TAGAA
+1	4	[(6,1)]	AGAAG
+2	-1	[]	
+2	-2	[]	
+2	-3	[]	
+2	-4	[]	
+2	1	[]	AATAG
+2	2	[]	ATAGC
+2	3	[]	TAGCT
+2	4	[]	AGCTT
+3	-1	[]	
+3	-2	[]	
+3	-3	[]
+3	-4	[(6,-1)]	
+3	1	[]	AATAG
+3	2	[]	ATAGA
+3	3	[]	TAGAA
+3	4	[(6,1)]	AGAAG
+4	-1	[]	
+4	-2	[]	
+4	-3	[]	
+4	-4	[]	
+4	1	[]	AATAG
+4	2	[]	ATAGC
+4	3	[]	TAGCT
+4	4	[]	AGCTT
+5	-1	[]	
+5	-2	[]	
+5	-3	[]
+5	-4	[(6,-1)]	
+5	1	[]	AATAG
+5	2	[]	ATAGA
+5	3	[]	TAGAA
+5	4	[(6,1)]	AGAAG
+6	-1	[(1,-4),(3,-4),(5,-4)]	
+6	-2	[]
+6	-3	[]
+6	-4	[]	
+6	1	[(1,4),(3,4),(5,4)]	AGAAG
+6	2	[]	GAAGA
+6	3	[]	AAGAA
+6	4	[]	AGAAG
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
index 499200a..9035f33 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
@@ -1,6 +1,18 @@
-AAGAA	[(6,2)]
-AATAG	[(1,0),(2,0),(3,0),(4,0),(5,0)]
-AGAAG	[(1,3),(2,3),(3,3),(4,3),(5,3),(6,0),(6,3)]
-ATAGA	[(1,1),(2,1),(3,1),(4,1),(5,1)]
-GAAGA	[(6,1)]
-TAGAA	[(1,2),(2,2),(3,2),(4,2),(5,2)]
\ No newline at end of file
+AAGAA	[(6,3)]
+AAGCT	[(2,-4),(4,-4)]
+AATAG	[(1,1),(2,1),(3,1),(4,1),(5,1)]
+AGAAG	[(1,4),(3,4),(5,4),(6,1),(6,4)]
+AGCTA	[(2,-3),(4,-3)]
+AGCTT	[(2,4),(4,4)]
+ATAGA	[(1,2),(3,2),(5,2)]
+ATAGC	[(2,2),(4,2)]
+CTATT	[(1,-1),(2,-1),(3,-1),(4,-1),(5,-1)]
+CTTCT	[(1,-4),(3,-4),(5,-4),(6,-1),(6,-4)]
+GAAGA	[(6,2)]
+GCTAT	[(2,-2),(4,-2)]
+TAGAA	[(1,3),(3,3),(5,3)]
+TAGCT	[(2,3),(4,3)]
+TCTAT	[(1,-2),(3,-2),(5,-2)]
+TCTTC	[(6,-2)]
+TTCTA	[(1,-3),(3,-3),(5,-3)]
+TTCTT	[(6,-3)]
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
index 7d2e065..2cc283d 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
@@ -1,6 +1,6 @@
-1	0	[]	AATAG	1	[]	ATAGA	2	[]	TAGAA	3	[(6,0)]	AGAAG	
-2	0	[]	AATAG	1	[]	ATAGA	2	[]	TAGAA	3	[(6,0)]	AGAAG	
-3	0	[]	AATAG	1	[]	ATAGA	2	[]	TAGAA	3	[(6,0)]	AGAAG	
-4	0	[]	AATAG	1	[]	ATAGA	2	[]	TAGAA	3	[(6,0)]	AGAAG	
-5	0	[]	AATAG	1	[]	ATAGA	2	[]	TAGAA	3	[(6,0)]	AGAAG	
-6	0	[(1,3),(2,3),(3,3),(5,3),(4,3)]	AGAAG	1	[]	GAAGA	2	[]	AAGAA	3	[]	AGAAG	
+1	1	[]	AATAG	-1	[]		2	[]	ATAGA	-2	[]		3	[]	TAGAA	-3	[]		4	[(6,1)]	AGAAG	-4	[(6,-1)]		
+2	1	[]	AATAG	-1	[]		2	[]	ATAGC	-2	[]		3	[]	TAGCT	-3	[]		4	[]	AGCTT	-4	[]		
+3	1	[]	AATAG	-1	[]		2	[]	ATAGA	-2	[]		3	[]	TAGAA	-3	[]		4	[(6,1)]	AGAAG	-4	[(6,-1)]		
+4	1	[]	AATAG	-1	[]		2	[]	ATAGC	-2	[]		3	[]	TAGCT	-3	[]		4	[]	AGCTT	-4	[]		
+5	1	[]	AATAG	-1	[]		2	[]	ATAGA	-2	[]		3	[]	TAGAA	-3	[]		4	[(6,1)]	AGAAG	-4	[(6,-1)]		
+6	1	[(3,4),(1,4),(5,4)]	AGAAG	-1	[(1,-4),(3,-4),(5,-4)]		2	[]	GAAGA	-2	[]		3	[]	AAGAA	-3	[]		4	[]	AGAAG	-4	[]