Merge branch 'genomix/fullstack_genomix' of https://code.google.com/p/hyracks into jianfeng/genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
index a030f0b..86b9117 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
@@ -49,4 +49,5 @@
         return strKmer.toString();
     }
 
+
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
index 219def6..ecbf1ec 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
@@ -12,4 +12,11 @@
         bytes[offset + 2] = (byte)((val >>>  8) & 0xFF);
         bytes[offset + 3] = (byte)((val >>>  0) & 0xFF);
     }
+    
+    public static int hashBytes(byte[] bytes, int offset, int length) {
+        int hash = 1;
+        for (int i = offset; i < offset + length; i++)
+            hash = (31 * hash) + (int) bytes[i];
+        return hash;
+    }
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 5be5f83..d9f5c48 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -48,98 +48,22 @@
         }
         return r;
     }
+    
+    public static byte getPairedGeneCode(byte genecode){
+        if ( genecode < 0 || genecode > 3){
+            throw new IllegalArgumentException("Invalid genecode");
+        }
+        return (byte) (3- genecode);
+    }
+    
+    public static byte getPairedCodeFromSymbol(byte ch){
+        return getPairedGeneCode(getCodeFromSymbol(ch));
+    }
 
     public static byte getSymbolFromCode(byte code) {
-        if (code > 3) {
-            return '!';
+        if (code > 3 || code < 0 ) {
+            throw new IllegalArgumentException("Invalid genecode");
         }
         return GENE_SYMBOL[code];
     }
-
-    public static byte getAdjBit(byte t) {
-        byte r = 0;
-        switch (t) {
-            case 'A':
-            case 'a':
-                r = 1 << A;
-                break;
-            case 'C':
-            case 'c':
-                r = 1 << C;
-                break;
-            case 'G':
-            case 'g':
-                r = 1 << G;
-                break;
-            case 'T':
-            case 't':
-                r = 1 << T;
-                break;
-        }
-        return r;
-    }
-
-    /**
-     * It works for path merge. Merge the kmer by his next, we need to make sure
-     * the @{t} is a single neighbor.
-     * 
-     * @param t
-     *            the neighbor code in BitMap
-     * @return the genecode
-     */
-    public static byte getGeneCodeFromBitMap(byte t) {
-        switch (t) {
-            case 1 << A:
-                return A;
-            case 1 << C:
-                return C;
-            case 1 << G:
-                return G;
-            case 1 << T:
-                return T;
-        }
-        return -1;
-    }
-
-    public static byte getBitMapFromGeneCode(byte t) {
-        return (byte) (1 << t);
-    }
-
-    public static int countNumberOfBitSet(int i) {
-        int c = 0;
-        for (; i != 0; c++) {
-            i &= i - 1;
-        }
-        return c;
-    }
-
-    public static int inDegree(byte bitmap) {
-        return countNumberOfBitSet((bitmap >> 4) & 0x0f);
-    }
-
-    public static int outDegree(byte bitmap) {
-        return countNumberOfBitSet(bitmap & 0x0f);
-    }
-
-    public static byte mergePreNextAdj(byte pre, byte next) {
-        return (byte) (pre << 4 | (next & 0x0f));
-    }
-
-    public static String getSymbolFromBitMap(byte code) {
-        int left = (code >> 4) & 0x0F;
-        int right = code & 0x0F;
-        StringBuilder str = new StringBuilder();
-        for (int i = A; i <= T; i++) {
-            if ((left & (1 << i)) != 0) {
-                str.append((char) GENE_SYMBOL[i]);
-            }
-        }
-        str.append('|');
-        for (int i = A; i <= T; i++) {
-            if ((right & (1 << i)) != 0) {
-                str.append((char) GENE_SYMBOL[i]);
-            }
-        }
-        return str.toString();
-    }
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 78d8d85..40d8a23 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -204,8 +204,9 @@
     }
 
     /**
-     * Compress Reversed Kmer into bytes array AATAG will compress as
-     * [0x000A,0xATAG]
+     * Compress Reversed read into bytes array
+     * e.g. AATAG will paired to CTATT, and then compress as
+     * [0x000T,0xTATC]
      * 
      * @param input
      *            array
@@ -217,7 +218,7 @@
         int bytecount = 0;
         int bcount = size - 1;
         for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
-            byte code = GeneCode.getCodeFromSymbol(array[i]);
+            byte code = GeneCode.getPairedCodeFromSymbol(array[i]);
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
@@ -308,7 +309,7 @@
      *            : next neighbor in gene-code format
      * @return the merged Kmer, this K of this Kmer is k+1
      */
-    public void mergeKmerWithNextCode(byte nextCode) {
+    public void mergeNextCode(byte nextCode) {
         this.kmerlength += 1;
         setSize(KmerUtil.getByteNumFromK(kmerlength));
         if (kmerlength % 4 == 1) {
@@ -321,6 +322,57 @@
         }
         clearLeadBit();
     }
+    
+    public void mergePreCode(byte preCode) {
+        //TODO
+        return;
+    }
+
+    /**
+     * Merge Kmer with the next connected Kmer
+     * e.g. AAGCTAA merge with AACAACC, if the initial kmerSize = 3
+     * then it will return AAGCTAACAACC
+     * 
+     * @param initialKmerSize
+     *            : the initial kmerSize
+     * @param kmer
+     */
+    @SuppressWarnings("unchecked")
+    public void mergeNextKmer(int initialKmerSize, KmerBytesWritable kmer) {
+        if (kmer.getKmerLength() == initialKmerSize){
+            mergeNextCode(kmer.getGeneCodeAtPosition(kmer.getKmerLength()-1));
+            return;
+        }
+        int preKmerLength = kmerlength;
+        int preSize = size;
+        this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
+        setSize(KmerUtil.getByteNumFromK(kmerlength));
+        int i = 1;
+        for (; i <= preSize; i++) {
+            bytes[offset + size - i] = bytes[offset + preSize - i];
+        }
+        if (i > 1) {
+            i--;
+        }
+        if (preKmerLength % 4 == 0) {
+            for (int j = 1; j <= kmer.getLength() && offset + size >= i + j; j++) {
+                bytes[offset + size - i - j] = kmer.getBytes()[kmer.getOffset() + kmer.getLength() - j];
+            }
+        } else {
+            int posNeedToMove = ((preKmerLength % 4) << 1);
+            bytes[offset + size - i] |= kmer.getBytes()[kmer.getOffset() + kmer.getLength() - 1] << posNeedToMove;
+            for (int j = 1; j <= kmer.getLength() && offset + size - i - j >= 0; j++) {
+                bytes[offset + size - i - j] = (byte) (((kmer.getBytes()[kmer.getOffset() + kmer.getLength() - j] & 0xff) >> (8 - posNeedToMove)) | (kmer
+                        .getBytes()[kmer.getOffset() + kmer.getLength() - j - 1] << posNeedToMove));
+            }
+        }
+        clearLeadBit();
+    }
+    
+    public void mergePreKmer(int initialKmerSize, KmerBytesWritable kmer){
+        //TODO
+        return;
+    }
 
     protected void clearLeadBit() {
         if (kmerlength % 4 != 0) {
@@ -390,4 +442,5 @@
     static { // register this comparator
         WritableComparator.define(KmerBytesWritable.class, new Comparator());
     }
+
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 73612e2..df4e755 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -31,10 +31,6 @@
         kmer = new KmerBytesWritable(kmerSize);
     }
 
-    public int getCount() {
-        return kmer.getKmerLength();
-    }
-
     public void setNodeID(PositionWritable ref) {
         this.setNodeID(ref.getReadID(), ref.getPosInRead());
     }
@@ -78,9 +74,18 @@
         return kmer;
     }
 
-    public void mergeNextWithinOneRead(NodeWritable nextNodeEntry) {
-        this.outgoingList.set(nextNodeEntry.outgoingList);
-        kmer.mergeKmerWithNextCode(nextNodeEntry.kmer.getGeneCodeAtPosition(nextNodeEntry.getCount() - 1));
+    public int getCount() {
+        return kmer.getKmerLength();
+    }
+
+    public void mergeNext(NodeWritable nextNode, int initialKmerSize) {
+        this.outgoingList.set(nextNode.outgoingList);
+        kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
+    }
+    
+    public void mergePre(NodeWritable preNode, int initialKmerSize){
+        this.incomingList.set(preNode.incomingList);
+        kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
     }
 
     public void set(NodeWritable node) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 42ff47a..132b464 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -67,7 +67,11 @@
     public int getLength() {
         return LENGTH;
     }
-
+    
+    public boolean isSameReadID(PositionWritable other){
+        return getReadID() == other.getReadID();
+    }
+    
     @Override
     public void readFields(DataInput in) throws IOException {
         in.readFully(storage, offset, LENGTH);
@@ -80,7 +84,7 @@
 
     @Override
     public int hashCode() {
-        return this.getReadID();
+        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
     }
 
     @Override
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index b7dd8f1..04fc3bb 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -59,7 +59,7 @@
         Assert.assertEquals(kmer.toString(), "AATAGAA");
 
         kmer.setByReadReverse(array, 1);
-        Assert.assertEquals(kmer.toString(), "GAAGATA");
+        Assert.assertEquals(kmer.toString(), "CTTCTAT");
     }
 
     @Test
@@ -100,11 +100,35 @@
         String text = "AGCTGACCG";
         for (int i = 0; i < 10; i++) {
             for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
-                kmer.mergeKmerWithNextCode(x);
+                kmer.mergeNextCode(x);
                 text = text + (char) GeneCode.GENE_SYMBOL[x];
                 Assert.assertEquals(text, kmer.toString());
             }
         }
     }
+    
+    @Test
+    public void TestMergeNextKmer(){
+        byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+        KmerBytesWritable kmer1 = new KmerBytesWritable(9);
+        kmer1.setByRead(array, 0);
+        String text1 = "AGCTGACCG";
+        KmerBytesWritable kmer2 = new KmerBytesWritable(9);
+        kmer2.setByRead(array, 1);
+        String text2 = "GCTGACCGT";
+        Assert.assertEquals(text1, kmer1.toString());
+        Assert.assertEquals(text2, kmer2.toString());
+
+        KmerBytesWritable merged = new KmerBytesWritable(kmer1);
+        merged.mergeNextKmer(9, kmer2);
+        Assert.assertEquals("AGCTGACCGT", merged);
+
+        KmerBytesWritable kmer3 = new KmerBytesWritable(3);
+        kmer3.setByRead(array, 1);
+        String text3 = "GCT";
+        Assert.assertEquals(text3, kmer3.toString());
+        merged.mergeNextKmer(1, kmer3);
+        Assert.assertEquals(text1 + text3, merged.toString());
+    }
 
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
index d3c2ff4..7b65158 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -37,12 +37,12 @@
         output.collect(outputKmer, outputVertexID);
         /** middle kmer */
         for (int i = KMER_SIZE; i < array.length - 1; i++) {
-            GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[i]));
+            outputKmer.shiftKmerWithNextChar(array[i]);
             outputVertexID.set((int)key.get(), (byte)(i - KMER_SIZE + 1));
             output.collect(outputKmer, outputVertexID);
         }
         /** last kmer */
-        GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[array.length - 1]));
+        outputKmer.shiftKmerWithNextChar(array[array.length - 1]);
         outputVertexID.set((int)key.get(), (byte)(array.length - 1 + 1));
         output.collect(outputKmer, outputVertexID);
     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 98b6ca1..49dd263 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -5,7 +5,11 @@
 
 public class PositionListReference extends PositionListWritable implements  IValueReference {
 
-	/**
+	public PositionListReference(int countByDataLength, byte[] byteArray, int startOffset) {
+	    super(countByDataLength, byteArray, startOffset);
+    }
+
+    /**
 	 * 
 	 */
 	private static final long serialVersionUID = 1L;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 17ebde5..53bd79d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -1,8 +1,12 @@
 package edu.uci.ics.genomix.hyracks.dataflow;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -99,7 +103,6 @@
             for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
                 positionEntry.setNewReference(data, offsetPoslist + i);
                 if (positionEntry.getPosInRead() == 0) {
-                    //TODO remove the pos of the same readID
                     zeroPositionCollection2.append(positionEntry);
                 } else {
                     noneZeroPositionCollection2.append(positionEntry);
@@ -131,7 +134,8 @@
                 if (posList2 == null) {
                     builder2.addFieldEndOffset();
                 } else {
-                    builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+                    writePosToFieldAndSkipSameReadID(pos, builder2.getDataOutput(), posList2);
+                    builder2.addFieldEndOffset();
                 }
                 // set kmer, may not useful
                 byte[] data = accessor.getBuffer().array();
@@ -153,6 +157,22 @@
             }
         }
 
+        private void writePosToFieldAndSkipSameReadID(PositionReference pos, DataOutput ds,
+                ArrayBackedValueStorage posList2) throws HyracksDataException {
+
+            PositionListWritable plist = new PositionListWritable(PositionListWritable.getCountByDataLength(posList2
+                    .getLength()), posList2.getByteArray(), posList2.getStartOffset());
+            for (PositionWritable p : plist) {
+                if (!pos.isSameReadID(p)) {
+                    try {
+                        ds.write(p.getByteArray(), p.getStartOffset(), p.getLength());
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+        }
+
         @Override
         public void fail() throws HyracksDataException {
             writer.fail();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index d27c73d..d80ed13 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -6,6 +6,7 @@
 import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -63,6 +64,7 @@
 
         private NodeReference curNodeEntry;
         private NodeReference nextNodeEntry;
+        private NodeReference nextNextNodeEntry;
 
         public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
                 RecordDescriptor outputRecDesc) {
@@ -71,6 +73,7 @@
             this.outputRecDesc = outputRecDesc;
             curNodeEntry = new NodeReference(kmerSize);
             nextNodeEntry = new NodeReference(kmerSize);
+            nextNextNodeEntry = new NodeReference(0);
         }
 
         @Override
@@ -98,25 +101,58 @@
             int readID = accessor.getBuffer().getInt(
                     offsetPoslist + accessor.getFieldStartOffset(tIndex, InputReadIDField));
             resetNode(curNodeEntry, readID, (byte) 0,
-                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), false);
+                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), true);
 
             for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
                 resetNode(nextNodeEntry, readID, (byte) (i - InputInfoFieldStart),
                         offsetPoslist + accessor.getFieldStartOffset(tIndex, i), true);
+                NodeReference pNextNext = null;
+                if (i + 1 < accessor.getFieldCount()) {
+                    resetNode(nextNextNodeEntry, readID, (byte) (i - InputInfoFieldStart + 1),
+                            offsetPoslist + accessor.getFieldStartOffset(tIndex, i + 1), false);
+                    pNextNext = nextNextNodeEntry;
+                }
+
                 if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
-                    curNodeEntry.mergeNextWithinOneRead(nextNodeEntry);
-                } else {
-                    curNodeEntry.setOutgoingList(nextNodeEntry.getOutgoingList());
+                    if (pNextNext == null || pNextNext.getOutgoingList().getCountOfPosition() == 0) {
+                        curNodeEntry.mergeNext(nextNodeEntry, kmerSize);
+                    } else {
+                        curNodeEntry.getOutgoingList().reset();
+                        curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
+                        outputNode(curNodeEntry);
+
+                        nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+                        curNodeEntry.set(nextNodeEntry);
+                    }
+                } else { // nextNode entry outgoing > 0
+                    curNodeEntry.getOutgoingList().set(nextNodeEntry.getOutgoingList());
                     curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
-                    outputNode(curNodeEntry);
                     nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+                    outputNode(curNodeEntry);
                     curNodeEntry.set(nextNodeEntry);
+                    curNodeEntry.getOutgoingList().reset();
                 }
             }
             outputNode(curNodeEntry);
         }
 
-        private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean byRef) {
+        /**
+         * The neighbor list is store in the next next node
+         * (3,2) [(1,0),(2,0)] will become the outgoing list of node (3,1)
+         * (3,1) [(1,0),(2,0)]
+         * 
+         * @param curNodeEntry2
+         * @param nextNodeEntry2
+         */
+        //        private void setOutgoingByNext(NodeReference curNodeEntry2, NodeReference nextNodeEntry2) {
+        //            if (nextNodeEntry2 == null) {
+        //                curNodeEntry2.getOutgoingList().reset();
+        //            } else {
+        //                curNodeEntry2.setOutgoingList(nextNodeEntry2.getOutgoingList());
+        //            }
+        //        }
+
+        private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean isInitial) {
             node.reset(kmerSize);
             node.setNodeID(readID, posInRead);
 
@@ -125,32 +161,39 @@
             int countPosition = PositionListWritable.getCountByDataLength(lengthPos);
             offset += INT_LENGTH;
             if (posInRead == 0) {
-                setPositionList(node.getIncomingList(), countPosition, buffer.array(), offset, byRef);
+                setPositionList(node.getIncomingList(), countPosition, buffer.array(), offset, true);
+                // minus 1 position of the incoming list to get the correct predecessor
+                for (PositionWritable pos : node.getIncomingList()) {
+                    if (pos.getPosInRead() == 0) {
+                        throw new IllegalArgumentException("The incoming position list contain invalid posInRead");
+                    }
+                    pos.set(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+                }
             } else {
-                setPositionList(node.getOutgoingList(), countPosition, buffer.array(), offset, byRef);
+                setPositionList(node.getOutgoingList(), countPosition, buffer.array(), offset, isInitial);
             }
             offset += lengthPos;
             int lengthKmer = buffer.getInt(offset);
             if (node.getKmer().getLength() != lengthKmer) {
                 throw new IllegalStateException("Size of Kmer is invalid ");
             }
-            setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, byRef);
+            setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, isInitial);
         }
 
-        private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean byRef) {
-            if (byRef) {
-                kmer.setNewReference(array, offset);
-            } else {
+        private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean isInitial) {
+            if (isInitial) {
                 kmer.set(array, offset);
+            } else {
+                kmer.setNewReference(array, offset);
             }
         }
 
         private void setPositionList(PositionListWritable positionListWritable, int count, byte[] array, int offset,
-                boolean byRef) {
-            if (byRef) {
-                positionListWritable.setNewReference(count, array, offset);
-            } else {
+                boolean isInitial) {
+            if (isInitial) {
                 positionListWritable.set(count, array, offset);
+            } else {
+                positionListWritable.setNewReference(count, array, offset);
             }
         }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 37d5289..855be64 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.io.Text;
 
 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.GeneCode;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -115,7 +116,7 @@
                     InsertToFrame(kmer, -readID, array.length - k, writer);
                     /** middle kmer */
                     for (int i = k; i < array.length; i++) {
-                        kmer.shiftKmerWithPreChar(array[i]);
+                        kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
                         InsertToFrame(kmer, -readID, array.length - i - 1, writer);
                     }
                 }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
deleted file mode 100644
index 216f631..0000000
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.test;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.hyracks.driver.Driver;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-
-public class JobKmerGroupbyTest {
-    private static final String ACTUAL_RESULT_DIR = "actual";
-    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-
-    private static final String DATA_PATH = "src/test/resources/data/webmap/text.txt";
-    private static final String HDFS_INPUT_PATH = "/webmap";
-    private static final String HDFS_OUTPUT_PATH = "/webmap_result";
-
-    private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
-    private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
-    private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
-    private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
-
-    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private MiniDFSCluster dfsCluster;
-
-    private JobConf conf = new JobConf();
-    private int numberOfNC = 2;
-    private int numPartitionPerMachine = 1;
-
-    private Driver driver;
-
-    @Before
-    public void setUp() throws Exception {
-        cleanupStores();
-        edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
-        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        startHDFS();
-
-        FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
-        FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-
-        conf.setInt(GenomixJobConf.KMER_LENGTH, 5);
-        driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
-                edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
-    }
-
-    private void cleanupStores() throws IOException {
-        FileUtils.forceMkdir(new File("teststore"));
-        FileUtils.forceMkdir(new File("build"));
-        FileUtils.cleanDirectory(new File("teststore"));
-        FileUtils.cleanDirectory(new File("build"));
-    }
-
-    private void startHDFS() throws IOException {
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        lfs.delete(new Path("build"), true);
-        System.setProperty("hadoop.log.dir", "logs");
-        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-        FileSystem dfs = FileSystem.get(conf);
-        Path src = new Path(DATA_PATH);
-        Path dest = new Path(HDFS_INPUT_PATH);
-        dfs.mkdirs(dest);
-        //dfs.mkdirs(result);
-        dfs.copyFromLocalFile(src, dest);
-
-        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
-        conf.writeXml(confOutput);
-        confOutput.flush();
-        confOutput.close();
-    }
-
-    private void cleanUpReEntry() throws IOException {
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        if (lfs.exists(new Path(DUMPED_RESULT))) {
-            lfs.delete(new Path(DUMPED_RESULT), true);
-        }
-        FileSystem dfs = FileSystem.get(conf);
-        if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
-            dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
-        }
-    }
-
-    @Test
-    public void TestAll() throws Exception {
-        cleanUpReEntry();
-        TestExternalGroupby();
-        cleanUpReEntry();
-        TestPreClusterGroupby();
-        cleanUpReEntry();
-        TestHybridGroupby();
-        cleanUpReEntry();
-        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
-        TestExternalReversedGroupby();
-        cleanUpReEntry();
-        TestPreClusterReversedGroupby();
-        cleanUpReEntry();
-        TestHybridReversedGroupby();
-    }
-
-    public void TestExternalGroupby() throws Exception {
-        conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
-        System.err.println("Testing ExternalGroupBy");
-        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_PATH));
-    }
-
-    public void TestPreClusterGroupby() throws Exception {
-        conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
-        System.err.println("Testing PreClusterGroupBy");
-        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_PATH));
-    }
-
-    public void TestHybridGroupby() throws Exception {
-        conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
-        System.err.println("Testing HybridGroupBy");
-        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_PATH));
-    }
-
-    public void TestExternalReversedGroupby() throws Exception {
-        conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
-        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
-        System.err.println("Testing ExternalGroupBy + Reversed");
-        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
-    }
-
-    public void TestPreClusterReversedGroupby() throws Exception {
-        conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
-        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
-        System.err.println("Testing PreclusterGroupBy + Reversed");
-        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
-    }
-
-    public void TestHybridReversedGroupby() throws Exception {
-        conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
-        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
-        System.err.println("Testing HybridGroupBy + Reversed");
-        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
-    }
-
-    private boolean checkResults(String expectedPath) throws Exception {
-        File dumped = null;
-        String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
-        if ("text".equalsIgnoreCase(format)) {
-            FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
-                    FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
-            dumped = new File(DUMPED_RESULT);
-        } else {
-
-            FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
-            File filePathTo = new File(CONVERT_RESULT);
-            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
-            for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
-                String partname = "/part-" + i;
-                //				FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
-                //						+ partname), FileSystem.getLocal(new Configuration()),
-                //						new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
-
-                Path path = new Path(HDFS_OUTPUT_PATH + partname);
-                FileSystem dfs = FileSystem.get(conf);
-                if (dfs.getFileStatus(path).getLen() == 0) {
-                    continue;
-                }
-                SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-
-                //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH,
-                        GenomixJobConf.DEFAULT_KMERLEN));
-//                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                KmerBytesWritable value = null;
-                while (reader.next(key, value)) {
-                    if (key == null || value == null) {
-                        break;
-                    }
-                    bw.write(key.toString() + "\t" + value.toString());
-                    System.out.println(key.toString() + "\t" + value.toString());
-                    bw.newLine();
-                }
-                reader.close();
-            }
-            bw.close();
-            dumped = new File(CONVERT_RESULT);
-        }
-
-        TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
-        return true;
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
-        cleanupHDFS();
-    }
-
-    private void cleanupHDFS() throws Exception {
-        dfsCluster.shutdown();
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index ca9ada0..8451567 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -93,7 +93,7 @@
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, null));
+        Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, new int [] {2}));
     }
 
     public void TestEndToEnd() throws Exception {
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
index 01d4a27..2988303 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
@@ -1,12 +1,16 @@
-((1,0)	[]	[(6,0),(1,3)]	AATAGAA)
-((1,3)	[(1,0)]	[(6,0)]	AGAAG)
-((2,0)	[]	[(6,0),(2,3)]	AATAGAA)
-((2,3)	[(2,0)]	[(6,0)]	AGAAG)
-((3,0)	[]	[(6,0),(3,3)]	AATAGAA)
-((3,3)	[(3,0)]	[(6,0)]	AGAAG)
-((4,0)	[]	[(6,0),(4,3)]	AATAGAA)
-((4,3)	[(4,0)]	[(6,0)]	AGAAG)
-((5,0)	[]	[(6,0),(5,3)]	AATAGAA)
-((5,3)	[(5,0)]	[(6,0)]	AGAAG)
-((6,0)	[(1,3),(2,3),(3,3),(5,3),(6,3),(4,3)]	[(6,0),(6,3)]	AGAAGAA)
-((6,3)	[(6,0)]	[(6,0)]	AGAAG)
+((1,0)	[]	[(1,2)]	AATAGA)
+((1,2)	[(1,0)]	[(6,0),(1,3)]	TAGAA)
+((1,3)	[(1,2)]	[]	AGAAG)
+((2,0)	[]	[(2,2)]	AATAGA)
+((2,2)	[(2,0)]	[(6,0),(2,3)]	TAGAA)
+((2,3)	[(2,2)]	[]	AGAAG)
+((3,0)	[]	[(3,2)]	AATAGA)
+((3,2)	[(3,0)]	[(6,0),(3,3)]	TAGAA)
+((3,3)	[(3,2)]	[]	AGAAG)
+((4,0)	[]	[(4,2)]	AATAGA)
+((4,2)	[(4,0)]	[(6,0),(4,3)]	TAGAA)
+((4,3)	[(4,2)]	[]	AGAAG)
+((5,0)	[]	[(5,2)]	AATAGA)
+((5,2)	[(5,0)]	[(6,0),(5,3)]	TAGAA)
+((5,3)	[(5,2)]	[]	AGAAG)
+((6,0)	[(1,2),(2,2),(3,2),(5,2),(4,2)]	[]	AGAAGAAG)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
index 0ca9de6..2585102 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
@@ -18,7 +18,7 @@
 5	1	[]	ATAGA
 5	2	[]	TAGAA
 5	3	[(6,0)]	AGAAG
-6	0	[(1,3),(2,3),(3,3),(4,3),(5,3),(6,3)]	AGAAG
+6	0	[(1,3),(2,3),(3,3),(5,3),(4,3)]	AGAAG
 6	1	[]	GAAGA
 6	2	[]	AAGAA
-6	3	[(6,0)]	AGAAG
\ No newline at end of file
+6	3	[]	AGAAG
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
index 82d4692..1cd4274 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
@@ -3,4 +3,4 @@
 3	0 [] AATAG	1 [] ATAGA	2 [] TAGAA	3 [(6,0)] AGAAG	
 4	0 [] AATAG	1 [] ATAGA	2 [] TAGAA	3 [(6,0)] AGAAG	
 5	0 [] AATAG	1 [] ATAGA	2 [] TAGAA	3 [(6,0)] AGAAG	
-6	0 [(1,3),(2,3),(3,3),(5,3),(6,3),(4,3)] AGAAG	1 [] GAAGA	2 [] AAGAA	3 [(6,0)] AGAAG	
+6	0 [(1,3),(2,3),(3,3),(5,3),(4,3)] AGAAG	1 [] GAAGA	2 [] AAGAA	3 [] AGAAG