Merge commit '50e9f8614b4124288f606d90c3417caa2b5f80de' into nanzhang/hyracks_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
new file mode 100644
index 0000000..f7caebb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -0,0 +1,260 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+/**
+ * A list of fixed-length kmers. The length of this list is stored internally.
+ */
+public class KmerListWritable implements Writable, Iterable<KmerBytesWritable>, Serializable {
+    private static final long serialVersionUID = 1L;
+    protected static final byte[] EMPTY_BYTES = { 0, 0, 0, 0 };
+    protected static final int HEADER_SIZE = 4;
+
+    protected byte[] storage;
+    protected int offset;
+    protected int valueCount;
+    protected int storageMaxSize;  // since we may be a reference inside a larger datablock, we must track our maximum size
+
+    private KmerBytesWritable posIter = new KmerBytesWritable();
+
+    public KmerListWritable() {
+        storage = EMPTY_BYTES;
+        valueCount = 0;
+        offset = 0;
+        storageMaxSize = storage.length; 
+    }
+
+    public KmerListWritable(byte[] data, int offset) {
+        setNewReference(data, offset);
+    }
+
+    public KmerListWritable(List<KmerBytesWritable> kmers) {
+        this();
+        setSize(kmers.size() * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE); // reserve space for all elements
+        for (KmerBytesWritable kmer : kmers) {
+            append(kmer);
+        }
+    }
+
+    public void setNewReference(byte[] data, int offset) {
+        valueCount = Marshal.getInt(data, offset);
+        if (valueCount * KmerBytesWritable.getBytesPerKmer() > data.length - offset) {
+            throw new IllegalArgumentException("Specified data buffer (len=" + (data.length - offset)
+                    + ") is not large enough to store requested number of elements (" + valueCount + ")!");
+        }
+        this.storage = data;
+        this.offset = offset;
+        this.storageMaxSize = valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
+    }
+
+    public void append(KmerBytesWritable kmer) {
+        setSize((1 + valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+        System.arraycopy(kmer.getBytes(), kmer.offset, storage,
+                offset + HEADER_SIZE + valueCount * KmerBytesWritable.getBytesPerKmer(),
+                KmerBytesWritable.getBytesPerKmer());
+        valueCount += 1;
+        Marshal.putInt(valueCount, storage, offset);
+    }
+
+    /*
+     * Append the otherList to the end of myList
+     */
+    public void appendList(KmerListWritable otherList) {
+        if (otherList.valueCount > 0) {
+            setSize((valueCount + otherList.valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+            // copy contents of otherList into the end of my storage
+            System.arraycopy(otherList.storage, otherList.offset + HEADER_SIZE, storage, offset + HEADER_SIZE
+                    + valueCount * KmerBytesWritable.getBytesPerKmer(),
+                    otherList.valueCount * KmerBytesWritable.getBytesPerKmer());
+            valueCount += otherList.valueCount;
+            Marshal.putInt(valueCount, storage, offset);
+        }
+    }
+
+    /**
+     * Save the union of my list and otherList. Uses a temporary HashSet for
+     * uniquefication
+     */
+    public void unionUpdate(KmerListWritable otherList) {
+        int newSize = valueCount + otherList.valueCount;
+        HashSet<KmerBytesWritable> uniqueElements = new HashSet<KmerBytesWritable>(newSize);
+        for (KmerBytesWritable kmer : this) {
+            uniqueElements.add(kmer);
+        }
+        for (KmerBytesWritable kmer : otherList) {
+            uniqueElements.add(kmer);
+        }
+        valueCount = 0;
+        setSize(newSize * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+        for (KmerBytesWritable kmer : uniqueElements) { // this point is not efficient
+            append(kmer);
+        }
+        Marshal.putInt(valueCount, storage, offset);
+    }
+
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+    }
+
+    protected int getCapacity() {
+        return storageMaxSize - offset;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap > getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (valueCount > 0) {
+                System.arraycopy(storage, offset, new_data, 0, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+            }
+            storage = new_data;
+            offset = 0;
+            storageMaxSize = storage.length;
+        }
+    }
+
+    public void reset() {
+        valueCount = 0;
+        Marshal.putInt(valueCount, storage, offset);
+    }
+
+    public KmerBytesWritable getPosition(int i) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        posIter.setAsReference(storage, offset + HEADER_SIZE + i * KmerBytesWritable.getBytesPerKmer());
+        return posIter;
+    }
+
+    public void setCopy(KmerListWritable otherList) {
+        setCopy(otherList.storage, otherList.offset);
+    }
+
+    /**
+     * read a KmerListWritable from newData, which should include the header
+     */
+    public void setCopy(byte[] newData, int offset) {
+        int newValueCount = Marshal.getInt(newData, offset);
+        setSize(newValueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+        if (newValueCount > 0) {
+            System.arraycopy(newData, offset + HEADER_SIZE, storage, this.offset + HEADER_SIZE, newValueCount
+                    * KmerBytesWritable.getBytesPerKmer());
+        }
+        valueCount = newValueCount;
+        Marshal.putInt(valueCount, storage, this.offset);
+    }
+
+    @Override
+    public Iterator<KmerBytesWritable> iterator() {
+        Iterator<KmerBytesWritable> it = new Iterator<KmerBytesWritable>() {
+
+            private int currentIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return currentIndex < valueCount;
+            }
+
+            @Override
+            public KmerBytesWritable next() {
+                return getPosition(currentIndex++);
+            }
+
+            @Override
+            public void remove() {
+                if (currentIndex < valueCount)
+                    System.arraycopy(storage, offset + currentIndex * KmerBytesWritable.getBytesPerKmer(), storage,
+                            offset + (currentIndex - 1) * KmerBytesWritable.getBytesPerKmer(),
+                            (valueCount - currentIndex) * KmerBytesWritable.getBytesPerKmer());
+                valueCount--;
+                currentIndex--;
+                Marshal.putInt(valueCount, storage, offset);
+            }
+        };
+        return it;
+    }
+
+    /*
+     * remove the first instance of `toRemove`. Uses a linear scan. Throws an
+     * exception if not in this list.
+     */
+    public void remove(KmerBytesWritable toRemove, boolean ignoreMissing) {
+        Iterator<KmerBytesWritable> posIterator = this.iterator();
+        while (posIterator.hasNext()) {
+            if (toRemove.equals(posIterator.next())) {
+                posIterator.remove();
+                return;  // break as soon as the element is found 
+            }
+        }
+        // element was not found
+        if (!ignoreMissing) {
+            throw new ArrayIndexOutOfBoundsException("the KmerBytesWritable `" + toRemove.toString()
+                    + "` was not found in this list.");
+        }
+    }
+
+    public void remove(KmerBytesWritable toRemove) {
+        remove(toRemove, false);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        valueCount = in.readInt();
+        setSize(valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+        in.readFully(storage, offset + HEADER_SIZE, valueCount * KmerBytesWritable.getBytesPerKmer() - HEADER_SIZE);
+        Marshal.putInt(valueCount, storage, offset);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.write(storage, offset, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+    }
+
+    public int getCountOfPosition() {
+        return valueCount;
+    }
+
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('[');
+        for (int i = 0; i < valueCount; i++) {
+            sbuilder.append(getPosition(i).toString());
+            sbuilder.append(',');
+        }
+        if (valueCount > 0) {
+            sbuilder.setCharAt(sbuilder.length() - 1, ']');
+        } else {
+            sbuilder.append(']');
+        }
+        return sbuilder.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index d61f679..34dfefe 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -71,7 +71,6 @@
         this.kmer.reset(0);
     }
     
-    
     public PositionListWritable getNodeIdList() {
         return nodeIdList;
     }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index 4f34542..11b0f12 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -155,6 +155,12 @@
         System.arraycopy(newData, offset + HEADER_SIZE, bytes, this.kmerStartOffset, bytesUsed);
     }
 
+    public void setAsCopy(int k, byte[] newData, int offset) {
+//        int k = Marshal.getInt(newData, offset);
+        reset(k);
+        System.arraycopy(newData, offset, bytes, this.kmerStartOffset, bytesUsed);
+    }
+    
     /**
      * Point this datablock to the given bytes array It works like the pointer
      * to new datablock.
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
index aa33350..f269b5a 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
@@ -24,7 +24,7 @@
     protected byte[] storage;
     protected int offset;
     protected int valueCount;
-    protected int storageMaxSize;  // since we may be a reference inside a larger datablock, we must track our maximum size
+    protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
 
     private VKmerBytesWritable posIter = new VKmerBytesWritable();
 
@@ -32,7 +32,7 @@
         storage = EMPTY_BYTES;
         valueCount = 0;
         offset = 0;
-        storageMaxSize = storage.length; 
+        storageMaxSize = storage.length;
     }
 
     public VKmerListWritable(byte[] data, int offset) {
@@ -55,8 +55,16 @@
 
     public void append(VKmerBytesWritable kmer) {
         setSize(getLength() + kmer.getLength());
-        System.arraycopy(kmer.getBytes(), kmer.kmerStartOffset - VKmerBytesWritable.HEADER_SIZE,
-                storage, offset + getLength(), 
+        System.arraycopy(kmer.getBytes(), kmer.kmerStartOffset - VKmerBytesWritable.HEADER_SIZE, storage, offset
+                + getLength(), kmer.getLength());
+        valueCount += 1;
+        Marshal.putInt(valueCount, storage, offset);
+    }
+
+    public void append(int k, KmerBytesWritable kmer) {
+        setSize(getLength() + HEADER_SIZE + kmer.getLength());
+        Marshal.putInt(k, storage, offset + getLength());
+        System.arraycopy(kmer.getBytes(), kmer.getOffset(), storage, offset + getLength() + HEADER_SIZE,
                 kmer.getLength());
         valueCount += 1;
         Marshal.putInt(valueCount, storage, offset);
@@ -79,7 +87,7 @@
     public void appendList(VKmerListWritable otherList) {
         if (otherList.valueCount > 0) {
             setSize(getLength() + otherList.getLength() - HEADER_SIZE); // one of the headers is redundant
-            
+
             // copy contents of otherList into the end of my storage
             System.arraycopy(otherList.storage, otherList.offset + HEADER_SIZE, // skip other header
                     storage, offset + getLength(), // add to end
@@ -103,7 +111,7 @@
         for (VKmerBytesWritable kmer : otherList) {
             uniqueElements.add(kmer); // references okay
         }
-        setSize(getLength() + otherList.getLength());  // upper bound on memory usage
+        setSize(getLength() + otherList.getLength()); // upper bound on memory usage
         valueCount = 0;
         for (VKmerBytesWritable kmer : uniqueElements) {
             append(kmer);
@@ -142,9 +150,9 @@
         posIter.setAsReference(storage, getOffsetOfKmer(i));
         return posIter;
     }
-    
+
     /**
-     * Return the offset of the kmer at the i'th position 
+     * Return the offset of the kmer at the i'th position
      */
     public int getOffsetOfKmer(int i) {
         if (i >= valueCount) {
@@ -170,9 +178,8 @@
         int newLength = getLength(newData, newOffset);
         setSize(newLength);
         if (newValueCount > 0) {
-            System.arraycopy(newData, newOffset + HEADER_SIZE, 
-                    storage, this.offset + HEADER_SIZE,
-                    newLength - HEADER_SIZE);
+            System.arraycopy(newData, newOffset + HEADER_SIZE, storage, this.offset + HEADER_SIZE, newLength
+                    - HEADER_SIZE);
         }
         valueCount = newValueCount;
         Marshal.putInt(valueCount, storage, this.offset);
@@ -193,7 +200,8 @@
             @Override
             public VKmerBytesWritable next() {
                 posIter.setAsReference(storage, currentOffset);
-                currentOffset += KmerUtil.getByteNumFromK(Marshal.getInt(storage, currentOffset)) + VKmerBytesWritable.HEADER_SIZE;
+                currentOffset += KmerUtil.getByteNumFromK(Marshal.getInt(storage, currentOffset))
+                        + VKmerBytesWritable.HEADER_SIZE;
                 currentIndex++;
                 return posIter;
             }
@@ -201,16 +209,17 @@
             @Override
             public void remove() {
                 if (currentOffset <= 0) {
-                    throw new IllegalStateException("You must advance the iterator using .next() before calling remove()!");
+                    throw new IllegalStateException(
+                            "You must advance the iterator using .next() before calling remove()!");
                 }
                 // we're removing the element at prevIndex
                 int prevIndex = currentIndex - 1;
                 int prevOffset = getOffsetOfKmer(prevIndex);
-                
+
                 if (currentIndex < valueCount) { // if it's the last element, don't have to do any copying
                     System.arraycopy(storage, currentOffset, // from the "next" element
                             storage, prevOffset, // to the one just returned (overwriting it)
-                            getLength() - currentOffset + offset);  // remaining bytes except current element
+                            getLength() - currentOffset + offset); // remaining bytes except current element
                 }
                 valueCount--;
                 currentIndex--;
@@ -230,7 +239,7 @@
         while (posIterator.hasNext()) {
             if (toRemove.equals(posIterator.next())) {
                 posIterator.remove();
-                return;  // break as soon as the element is found 
+                return; // break as soon as the element is found 
             }
         }
         // element was not found
@@ -258,7 +267,8 @@
             elemBytes = KmerUtil.getByteNumFromK(elemLetters) + VKmerBytesWritable.HEADER_SIZE;
             setSize(curLength + elemBytes); // make sure we have room for the new element
             Marshal.putInt(elemLetters, storage, curOffset); // write header
-            in.readFully(storage, curOffset + VKmerBytesWritable.HEADER_SIZE, elemBytes - VKmerBytesWritable.HEADER_SIZE); // write kmer
+            in.readFully(storage, curOffset + VKmerBytesWritable.HEADER_SIZE, elemBytes
+                    - VKmerBytesWritable.HEADER_SIZE); // write kmer
             curOffset += elemBytes;
             curLength += elemBytes;
         }
@@ -284,19 +294,21 @@
     public int getLength() {
         int totalSize = HEADER_SIZE;
         for (int curCount = 0; curCount < valueCount; curCount++) {
-            totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(storage, offset + totalSize)) + VKmerBytesWritable.HEADER_SIZE;
+            totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(storage, offset + totalSize))
+                    + VKmerBytesWritable.HEADER_SIZE;
         }
         return totalSize;
     }
-    
+
     public static int getLength(byte[] listStorage, int listOffset) {
-      int totalSize = HEADER_SIZE;
-      int listValueCount = Marshal.getInt(listStorage, listOffset);
-      for (int curCount = 0; curCount < listValueCount; curCount++) {
-          totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(listStorage, listOffset + totalSize)) + VKmerBytesWritable.HEADER_SIZE;
-      }
-      return totalSize;
-  }
+        int totalSize = HEADER_SIZE;
+        int listValueCount = Marshal.getInt(listStorage, listOffset);
+        for (int curCount = 0; curCount < listValueCount; curCount++) {
+            totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(listStorage, listOffset + totalSize))
+                    + VKmerBytesWritable.HEADER_SIZE;
+        }
+        return totalSize;
+    }
 
     @Override
     public String toString() {
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
new file mode 100644
index 0000000..5bcf663
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.genomix.data.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class NodeWritableTest {
+
+    @Test
+    public void testNodeReset() throws IOException {
+        
+        NodeWritable outputNode = new NodeWritable();
+        NodeWritable inputNode = new NodeWritable();
+        
+        KmerListWritable nextKmerList = new KmerListWritable();
+        KmerListWritable preKmerList = new KmerListWritable();
+        KmerBytesWritable preKmer = new KmerBytesWritable();
+        KmerBytesWritable curKmer = new KmerBytesWritable();
+        KmerBytesWritable nextKmer = new KmerBytesWritable();
+        PositionWritable nodeId = new PositionWritable();
+        PositionListWritable nodeIdList = new PositionListWritable();
+        KmerBytesWritable.setGlobalKmerLength(5);
+        
+        nodeId.set((byte)0, (long)1, 0);
+        nodeIdList.append(nodeId);
+        for (int i = 6; i <= 10; i++) {
+        NodeWritable tempNode = new NodeWritable();
+        
+        String randomString = generaterRandomString(i);
+        byte[] array = randomString.getBytes();
+        
+        curKmer.setByRead(array, 0);
+        preKmer.setAsCopy(curKmer);
+        nextKmer.setAsCopy(curKmer);
+        nextKmer.shiftKmerWithNextChar(array[5]);
+        
+        nextKmerList.append(nextKmer);
+        
+        outputNode.setNodeIdList(nodeIdList);
+        outputNode.setFFList(nextKmerList);
+        
+        tempNode.setNodeIdList(nodeIdList);
+        tempNode.setFFList(nextKmerList);
+        
+        inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+        Assert.assertEquals(tempNode.toString(), inputNode.toString());
+        
+        int j = 5;
+        for (; j < array.length - 1; j++) {
+            outputNode.reset();
+            curKmer.setAsCopy(nextKmer);
+            
+            nextKmer.shiftKmerWithNextChar(array[j+1]);
+            nextKmerList.reset();
+            nextKmerList.append(nextKmer);
+            preKmerList.reset();
+            preKmerList.append(preKmer);
+            outputNode.setNodeIdList(nodeIdList);
+            outputNode.setFFList(nextKmerList);
+            outputNode.setRRList(preKmerList);
+            tempNode.reset();
+            tempNode.setNodeIdList(nodeIdList);
+            tempNode.setFFList(nextKmerList);
+            tempNode.setRRList(preKmerList);
+            preKmer.setAsCopy(curKmer);
+            inputNode.reset();
+            inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+            Assert.assertEquals(tempNode.toString(), inputNode.toString());
+        }
+        curKmer.setAsCopy(nextKmer);
+        preKmerList.reset();
+        preKmerList.append(preKmer);
+        outputNode.reset();
+        outputNode.setNodeIdList(nodeIdList);
+        outputNode.setRRList(preKmerList);
+        tempNode.reset();
+        tempNode.setNodeIdList(nodeIdList);
+        tempNode.setRRList(preKmerList);
+        inputNode.reset();
+        inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+        Assert.assertEquals(tempNode.toString(), inputNode.toString());
+        }
+    }
+
+    public String generaterRandomString(int n) {
+        char[] chars = "ACGT".toCharArray();
+        StringBuilder sb = new StringBuilder();
+        Random random = new Random();
+        for (int i = 0; i < n; i++) {
+            char c = chars[random.nextInt(chars.length)];
+            sb.append(c);
+        }
+        return sb.toString();
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 42d8db6..c77fa70 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -18,14 +18,12 @@
 import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
@@ -47,24 +45,23 @@
 
     public static final int OutputKmerField = 0;
     public static final int OutputNodeField = 1;
-    
 
     private final int readLength;
     private final int kmerSize;
 
     public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
-            null});
+            null });
 
     public ReadsKeyValueParserFactory(int readlength, int k) {
         this.readLength = readlength;
         this.kmerSize = k;
     }
-    
-    public static enum KmerDir {
+
+    public enum KmerDir {
         FORWARD,
         REVERSE,
     }
-    
+
     @Override
     public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
         final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
@@ -73,27 +70,22 @@
         outputAppender.reset(outputBuffer, true);
         KmerBytesWritable.setGlobalKmerLength(kmerSize);
         return new IKeyValueParser<LongWritable, Text>() {
-            
+
             private PositionWritable nodeId = new PositionWritable();
             private PositionListWritable nodeIdList = new PositionListWritable();
-            private VKmerListWritable edgeListForPreKmer = new VKmerListWritable();
-            private VKmerListWritable edgeListForNextKmer = new VKmerListWritable();
-            private NodeWritable outputNode = new NodeWritable();
-//            private NodeWritable outputNode2 = new NodeWritable();
+            private NodeWritable curNode = new NodeWritable();
+            private NodeWritable nextNode = new NodeWritable();
 
-            private KmerBytesWritable preForwardKmer = new KmerBytesWritable();           
-            private KmerBytesWritable preReverseKmer = new KmerBytesWritable();
             private KmerBytesWritable curForwardKmer = new KmerBytesWritable();
             private KmerBytesWritable curReverseKmer = new KmerBytesWritable();
             private KmerBytesWritable nextForwardKmer = new KmerBytesWritable();
             private KmerBytesWritable nextReverseKmer = new KmerBytesWritable();
-            
-            private KmerDir preKmerDir = KmerDir.FORWARD;
+
             private KmerDir curKmerDir = KmerDir.FORWARD;
             private KmerDir nextKmerDir = KmerDir.FORWARD;
 
             byte mateId = (byte) 0;
-            
+
             @Override
             public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
                 String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
@@ -125,145 +117,87 @@
                 if (kmerSize >= array.length) {
                     return;
                 }
-                outputNode.reset();
+                curNode.reset();
+                nextNode.reset();
                 curForwardKmer.setByRead(array, 0);
                 curReverseKmer.setByReadReverse(array, 0);
                 curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
-                setNextKmer(array[kmerSize]);
-                setnodeId(mateId, readID, 0);
-                setEdgeListForNextKmer();
-                writeToFrame(writer);
+                nextForwardKmer.setAsCopy(curForwardKmer);
+                setNextKmer(nextForwardKmer, nextReverseKmer, nextKmerDir, array[kmerSize]);
+                setnodeId(curNode, mateId, readID, 0);
+                setnodeId(nextNode, mateId, readID, 0);
+                setEdgeListForCurAndNextKmer(curKmerDir, curNode, nextKmerDir, nextNode);
+                writeToFrame(curForwardKmer, curReverseKmer, curKmerDir, curNode, writer);
 
                 /*middle kmer*/
-                int i = kmerSize;
-                for (; i < array.length - 1; i++) {
-                    outputNode.reset();
-                    setPreKmerByOldCurKmer();
-                    setCurKmerByOldNextKmer();
-                    setNextKmer(array[i]);
-                    setnodeId(mateId, readID, 0);//i - kmerSize + 1
-                    setEdgeListForPreKmer();
-                    setEdgeListForNextKmer();
-                    writeToFrame(writer);
+                int i = kmerSize + 1;
+                for (; i < array.length; i++) {
+                    curForwardKmer.setAsCopy(nextForwardKmer);
+                    curReverseKmer.setAsCopy(nextReverseKmer);
+                    curNode.set(nextNode);
+                    nextNode.reset();
+                    setNextKmer(nextForwardKmer, nextReverseKmer, nextKmerDir, array[kmerSize]);
+                    setnodeId(nextNode, mateId, readID, 0);
+                    setEdgeListForCurAndNextKmer(curKmerDir, curNode, nextKmerDir, nextNode);
+                    writeToFrame(curForwardKmer, curReverseKmer, curKmerDir, curNode, writer);
                 }
-                
+
                 /*last kmer*/
-                outputNode.reset();
-                setPreKmerByOldCurKmer();
-                setCurKmerByOldNextKmer();
-                setnodeId(mateId, readID, 0);//array.length - kmerSize + 1
-                setEdgeListForPreKmer();
-                writeToFrame(writer);
+                writeToFrame(nextForwardKmer, nextReverseKmer, nextKmerDir, nextNode, writer);
             }
-            
-            public void setnodeId(byte mateId, long readID, int posId){
+
+            public void setnodeId(NodeWritable node, byte mateId, long readID, int posId) {
                 nodeId.set(mateId, readID, posId);
                 nodeIdList.reset();
                 nodeIdList.append(nodeId);
-                outputNode.setNodeIdList(nodeIdList);
-            }
-            
-            public void setNextKmer(byte nextChar){
-                nextForwardKmer.setAsCopy(curForwardKmer);
-                nextForwardKmer.shiftKmerWithNextChar(nextChar);
-                nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
-                nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
-            }
-            
-            public void setPreKmerByOldCurKmer(){
-                preKmerDir = curKmerDir;
-                preForwardKmer.setAsCopy(curForwardKmer);
-                preReverseKmer.setAsCopy(curReverseKmer);
+                node.setNodeIdList(nodeIdList);
             }
 
-            public void setCurKmerByOldNextKmer(){
-                curKmerDir = nextKmerDir;
-                curForwardKmer.setAsCopy(nextForwardKmer);
-                curReverseKmer.setAsCopy(nextReverseKmer);
+            public void setNextKmer(KmerBytesWritable forwardKmer, KmerBytesWritable ReverseKmer, KmerDir nextKmerDir,
+                    byte nextChar) {
+                forwardKmer.shiftKmerWithNextChar(nextChar);
+                ReverseKmer.setByReadReverse(forwardKmer.toString().getBytes(), forwardKmer.getOffset());
+                nextKmerDir = forwardKmer.compareTo(ReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
             }
-            
-            public void writeToFrame(IFrameWriter writer) {
-                switch(curKmerDir){
+
+            public void writeToFrame(KmerBytesWritable forwardKmer, KmerBytesWritable reverseKmer, KmerDir curKmerDir,
+                    NodeWritable node, IFrameWriter writer) {
+                switch (curKmerDir) {
                     case FORWARD:
-                        InsertToFrame(curForwardKmer, outputNode, writer);
+                        InsertToFrame(forwardKmer, node, writer);
                         break;
                     case REVERSE:
-                        InsertToFrame(curReverseKmer, outputNode, writer);
+                        InsertToFrame(forwardKmer, node, writer);
                         break;
                 }
             }
-            public void setEdgeListForPreKmer(){
-                switch(curKmerDir){
-                    case FORWARD:
-                        switch(preKmerDir){
-                            case FORWARD:
-                                edgeListForPreKmer.reset();
-                                edgeListForPreKmer.append(preForwardKmer);
-                                outputNode.setRRList(edgeListForPreKmer);
-                                break;
-                            case REVERSE:
-                                edgeListForPreKmer.reset();
-                                edgeListForPreKmer.append(preReverseKmer);
-                                outputNode.setRFList(edgeListForPreKmer);
-                                break;
-                        }
-                        break;
-                    case REVERSE:
-                        switch(preKmerDir){
-                            case FORWARD:
-                                edgeListForPreKmer.reset();
-                                edgeListForPreKmer.append(preForwardKmer);
-                                outputNode.setFRList(edgeListForPreKmer);
-                                break;
-                            case REVERSE:
-                                edgeListForPreKmer.reset();
-                                edgeListForPreKmer.append(preReverseKmer);
-                                outputNode.setFFList(edgeListForPreKmer);
-                                break;
-                        }
-                        break;
+
+            public void setEdgeListForCurAndNextKmer(KmerDir curKmerDir, NodeWritable curNode, KmerDir nextKmerDir,
+                    NodeWritable nextNode) {
+                if (curKmerDir == KmerDir.FORWARD && nextKmerDir == KmerDir.FORWARD) {
+                    curNode.getFFList().append(kmerSize, nextForwardKmer);
+                    nextNode.getRRList().append(kmerSize, curForwardKmer);
+                }
+                if (curKmerDir == KmerDir.FORWARD && nextKmerDir == KmerDir.REVERSE) {
+                    curNode.getFRList().append(kmerSize, nextReverseKmer);
+                    nextNode.getFRList().append(kmerSize, curForwardKmer);
+                }
+                if (curKmerDir == KmerDir.REVERSE && nextKmerDir == KmerDir.FORWARD) {
+                    curNode.getRFList().append(kmerSize, nextForwardKmer);
+                    nextNode.getRFList().append(kmerSize, curReverseKmer);
+                }
+                if (curKmerDir == KmerDir.REVERSE && nextKmerDir == KmerDir.REVERSE) {
+                    curNode.getRRList().append(kmerSize, nextReverseKmer);
+                    nextNode.getFFList().append(kmerSize, curReverseKmer);
                 }
             }
-            
-            public void setEdgeListForNextKmer(){
-                switch(curKmerDir){
-                    case FORWARD:
-                        switch(nextKmerDir){
-                            case FORWARD:
-                                edgeListForNextKmer.reset();
-                                edgeListForNextKmer.append(nextForwardKmer);
-                                outputNode.setFFList(edgeListForNextKmer);
-                                break;
-                            case REVERSE:
-                                edgeListForNextKmer.reset();
-                                edgeListForNextKmer.append(nextReverseKmer);
-                                outputNode.setFRList(edgeListForNextKmer);
-                                break;
-                        }
-                        break;
-                    case REVERSE:
-                        switch(nextKmerDir){
-                            case FORWARD:
-                                edgeListForNextKmer.reset();
-                                edgeListForNextKmer.append(nextForwardKmer);
-                                outputNode.setRFList(edgeListForNextKmer);
-                                break;
-                            case REVERSE:
-                                edgeListForNextKmer.reset();
-                                edgeListForNextKmer.append(nextReverseKmer);
-                                outputNode.setRRList(edgeListForNextKmer);
-                                break;
-                        }
-                        break;
-                }
-            }
-            
+
             private void InsertToFrame(KmerBytesWritable kmer, NodeWritable node, IFrameWriter writer) {
                 try {
                     tupleBuilder.reset();
                     tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
                     tupleBuilder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
-                    
+
                     if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                             tupleBuilder.getSize())) {
                         FrameUtils.flushFrame(outputBuffer, writer);
@@ -289,5 +223,4 @@
             }
         };
     }
-
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 46fdd0e..a484179 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -48,6 +48,7 @@
         return new IAggregatorDescriptor() {
             
             private NodeWritable readNode = new NodeWritable();
+//            private KmerBytesWritable readKeyKmer = new KmerBytesWritable();
             
             protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -75,13 +76,14 @@
                     AggregateState state) throws HyracksDataException {
                 NodeWritable localUniNode = (NodeWritable) state.state;
                 localUniNode.reset();
+//                readKeyKmer.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 0));
                 readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
                 localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
                 localUniNode.getFFList().appendList(readNode.getFFList());
                 localUniNode.getFRList().appendList(readNode.getFRList());
                 localUniNode.getRFList().appendList(readNode.getRFList());
                 localUniNode.getRRList().appendList(readNode.getRRList());
-
+                
                 // make an empty field
                 tupleBuilder.addFieldEndOffset();// mark question?
             }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index 1ee6cae..aa3c9f6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -20,8 +20,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -70,6 +73,9 @@
                 localUniNode.reset();
                 readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
                 localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+//                VKmerBytesWritable a = new VKmerBytesWritable();
+ //               a.setAsCopy(readNode.getFFList().getPosition(0));
+  //              int kRequested = Marshal.getInt(readNode.getFFList().getByteArray(), readNode.getFFList().getStartOffset() + 4);
                 localUniNode.getFFList().unionUpdate(readNode.getFFList());
                 localUniNode.getFRList().unionUpdate(readNode.getFRList());
                 localUniNode.getRFList().unionUpdate(readNode.getRFList());
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
new file mode 100644
index 0000000..e248c3b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class AssembleKeyIntoNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+    public AssembleKeyIntoNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = outRecDesc;
+        this.kmerSize = kmerSize;
+        KmerBytesWritable.setGlobalKmerLength(this.kmerSize);
+    }
+
+    private static final long serialVersionUID = 1L;
+    private final int kmerSize;
+
+    public static final int InputKmerField = 0;
+    public static final int InputtempNodeField = 1;
+    public static final int OutputNodeField = 0;
+
+    public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[1]);
+
+    public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        public static final int INT_LENGTH = 4;  
+        private final IHyracksTaskContext ctx;  
+        private final RecordDescriptor inputRecDesc; 
+        private final RecordDescriptor outputRecDesc; 
+
+        private FrameTupleAccessor accessor;
+        private ByteBuffer writeBuffer;
+        private ArrayTupleBuilder builder;
+        private FrameTupleAppender appender;
+        
+        NodeWritable readNode;
+        KmerBytesWritable readKmer;
+
+        public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+                RecordDescriptor outputRecDesc) {
+            this.ctx = ctx;
+            this.inputRecDesc = inputRecDesc;
+            this.outputRecDesc = outputRecDesc;
+   
+            readNode = new NodeWritable();
+            readKmer = new KmerBytesWritable();            
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+            writeBuffer = ctx.allocateFrame();
+            builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+            appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(writeBuffer, true);
+            writer.open();
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            accessor.reset(buffer);
+            int tupleCount = accessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                generateNodeFromKmer(i);
+            }
+        }
+
+        private void generateNodeFromKmer(int tIndex) throws HyracksDataException {
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+            
+            setKmer(readKmer, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputKmerField));
+            readNode.reset();
+            setNode(readNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputtempNodeField));
+            readNode.getKmer().setAsCopy(readKmer.getKmerLength(), readKmer.getBytes(), readKmer.getOffset());
+            outputNode(readNode);
+        }
+
+
+        private void setKmer(KmerBytesWritable kmer, int offset) {
+            ByteBuffer buffer = accessor.getBuffer();
+            kmer.setAsCopy(buffer.array(), offset);
+        }
+
+        private void setNode(NodeWritable node, int offset) {
+            ByteBuffer buffer = accessor.getBuffer();
+            node.setAsCopy(buffer.array(), offset);
+        }
+
+
+        private void outputNode(NodeWritable node) throws HyracksDataException {
+
+            try {
+                builder.reset();
+                builder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
+
+                if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
+                    if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                        throw new IllegalStateException("Failed to append tuplebuilder to frame");
+                    }
+                }
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
+            }
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+        }
+
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+                recordDescriptors[0]);
+    }
+
+}
+
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
index b7b7054..fa6ae9b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -17,7 +17,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.AssembleKeyIntoNodeOperator;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -33,8 +33,7 @@
      */
     private static final long serialVersionUID = 1L;
     private final int kmerSize;
-    public static final int OutputKmerField = ReadsKeyValueParserFactory.OutputKmerField;
-    public static final int outputNodeField = ReadsKeyValueParserFactory.OutputNodeField;
+    public static final int OutputNodeField = AssembleKeyIntoNodeOperator.OutputNodeField;
     
     public NodeTextWriterFactory(int k) {
         this.kmerSize = k;
@@ -53,9 +52,7 @@
 
             @Override
             public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
-                node.setAsReference(tuple.getFieldData(outputNodeField), tuple.getFieldStart(outputNodeField));
-                node.getKmer().reset(kmerSize);
-                node.getKmer().setAsReference(tuple.getFieldData(OutputKmerField), tuple.getFieldStart(OutputKmerField));
+                node.setAsReference(tuple.getFieldData(OutputNodeField), tuple.getFieldStart(OutputNodeField));
                 try {
                     output.write(node.toString().getBytes());
                     output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index afc1cf7..6a5dcc4 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.AssembleKeyIntoNodeOperator;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
 import edu.uci.ics.genomix.hyracks.newgraph.io.NodeTextWriterFactory;
@@ -181,6 +182,16 @@
         return kmerCrossAggregator;
     }
 
+    public AbstractOperatorDescriptor generateKmerToFinalNode(JobSpecification jobSpec,
+            AbstractOperatorDescriptor kmerCrossAggregator) {
+
+        AbstractOperatorDescriptor mapToFinalNode = new AssembleKeyIntoNodeOperator(jobSpec,
+                AssembleKeyIntoNodeOperator.nodeOutputRec, kmerSize);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapToFinalNode, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return mapToFinalNode;
+    }
+    
     public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
             AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
         ITupleWriterFactory nodeWriter = null;
@@ -209,10 +220,15 @@
         logDebug("Group by Kmer");
         AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
 
+        logDebug("Generate final node");
+        lastOperator = generateKmerToFinalNode(jobSpec, lastOperator);
+
+        jobSpec.addRoot(lastOperator);
+        
         logDebug("Write node to result");
         lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
 
-        jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
+        jobSpec.addRoot(lastOperator);
         return jobSpec;
     }
 
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index 25915aa..bf87b23 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -32,7 +32,7 @@
 @SuppressWarnings("deprecation")
 public class JobRun {
     private static final int KmerSize = 5;
-    private static final int ReadLength = 6;
+    private static final int ReadLength = 7;
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
@@ -52,8 +52,8 @@
     
     @Test
     public void TestAll() throws Exception {
-        TestReader();
-//        TestGroupby();
+//        TestReader();
+        TestGroupby();
     }
     
     public void TestReader() throws Exception {
@@ -68,7 +68,7 @@
         cleanUpReEntry();
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-//        dumpResult();
+        dumpResult();
     }
     
     @Before
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 51a0d15..fbbc89a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
 @SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
     private static final int KmerSize = 5;
-    private static final int ReadLength = 8;
+    private static final int ReadLength = 7;
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-    private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test1.txt";
+    private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test.txt";
     private static final String HDFS_INPUT_PATH = "/webmap";
     private static final String HDFS_OUTPUT_PATH = "/webmap_result";
 
@@ -75,11 +75,11 @@
 
     @Test
     public void TestAll() throws Exception {
-//        TestReader();
+        TestReader();
 //        TestGroupbyKmer();
 //        TestMapKmerToRead();
 //        TestGroupByReadID();
-        TestEndToEnd();
+//        TestEndToEnd();
 //        TestUnMergedNode();
     }
 
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
new file mode 100644
index 0000000..bb03d70
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
@@ -0,0 +1,2 @@
+1	AATAG
+2	CATAC
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
index 3f1cd5c..a720dc4 100644
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
@@ -1 +1 @@
-1	AATAGA
+1	AATAGAA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 83f286c..5e7a856 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -313,7 +313,7 @@
      */
     public void processMerges(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
             byte neighborToMergeDir, KmerBytesWritable nodeToAdd, 
-            int kmerSize, KmerBytesWritable kmer){
+            int kmerSize, VKmerBytesWritable kmer){
         switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
             case MessageFlag.DIR_FF:
                 this.getFFList().remove(nodeToDelete); //set(null);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 64965e3..fc0cd4b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -95,7 +95,7 @@
     /**
      * get destination vertex
      */
-    public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
+    public VKmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
         if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
             posIterator = value.getFFList().iterator();
             outFlag &= MessageFlag.DIR_CLEAR;
@@ -112,7 +112,7 @@
         
     }
 
-    public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+    public VKmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
         if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
             posIterator = value.getRFList().iterator();
             outFlag &= MessageFlag.DIR_CLEAR;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index 99cd3c2..c1b1dda 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -51,7 +51,7 @@
             kmerList.reset();
         if(fakeVertex == null){
 //            fakeVertex = new KmerBytesWritable(kmerSize + 1); // TODO check if merge is correct
-            fakeVertex = new KmerBytesWritable();
+            fakeVertex = new VKmerBytesWritable();
             String random = generaterRandomString(kmerSize + 1);
             fakeVertex.setByRead(random.getBytes(), 0); 
         }
@@ -187,7 +187,7 @@
         job.setVertexInputFormatClass(GraphCleanInputFormat.class);
         job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
         job.setDynamicVertexValueSize(true);
-        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputKeyClass(VKmerBytesWritable.class);
         job.setOutputValueClass(VertexValueWritable.class);
         Client.run(args, job);
     }