Merge branch 'genomix/velvet_graphbuilding' of https://code.google.com/p/hyracks into genomix/velvet_graphbuilding
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 5be5f83..03e2fd9 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -50,96 +50,10 @@
     }
 
     public static byte getSymbolFromCode(byte code) {
-        if (code > 3) {
-            return '!';
+        if (code > 3 || code < 0) {
+            throw new IllegalArgumentException("Not such gene code");
         }
         return GENE_SYMBOL[code];
     }
 
-    public static byte getAdjBit(byte t) {
-        byte r = 0;
-        switch (t) {
-            case 'A':
-            case 'a':
-                r = 1 << A;
-                break;
-            case 'C':
-            case 'c':
-                r = 1 << C;
-                break;
-            case 'G':
-            case 'g':
-                r = 1 << G;
-                break;
-            case 'T':
-            case 't':
-                r = 1 << T;
-                break;
-        }
-        return r;
-    }
-
-    /**
-     * It works for path merge. Merge the kmer by his next, we need to make sure
-     * the @{t} is a single neighbor.
-     * 
-     * @param t
-     *            the neighbor code in BitMap
-     * @return the genecode
-     */
-    public static byte getGeneCodeFromBitMap(byte t) {
-        switch (t) {
-            case 1 << A:
-                return A;
-            case 1 << C:
-                return C;
-            case 1 << G:
-                return G;
-            case 1 << T:
-                return T;
-        }
-        return -1;
-    }
-
-    public static byte getBitMapFromGeneCode(byte t) {
-        return (byte) (1 << t);
-    }
-
-    public static int countNumberOfBitSet(int i) {
-        int c = 0;
-        for (; i != 0; c++) {
-            i &= i - 1;
-        }
-        return c;
-    }
-
-    public static int inDegree(byte bitmap) {
-        return countNumberOfBitSet((bitmap >> 4) & 0x0f);
-    }
-
-    public static int outDegree(byte bitmap) {
-        return countNumberOfBitSet(bitmap & 0x0f);
-    }
-
-    public static byte mergePreNextAdj(byte pre, byte next) {
-        return (byte) (pre << 4 | (next & 0x0f));
-    }
-
-    public static String getSymbolFromBitMap(byte code) {
-        int left = (code >> 4) & 0x0F;
-        int right = code & 0x0F;
-        StringBuilder str = new StringBuilder();
-        for (int i = A; i <= T; i++) {
-            if ((left & (1 << i)) != 0) {
-                str.append((char) GENE_SYMBOL[i]);
-            }
-        }
-        str.append('|');
-        for (int i = A; i <= T; i++) {
-            if ((right & (1 << i)) != 0) {
-                str.append((char) GENE_SYMBOL[i]);
-            }
-        }
-        return str.toString();
-    }
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index fd4c252..7e578f6 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -25,7 +25,7 @@
 import org.apache.hadoop.io.WritableComparator;
 
 /**
- * Fix kmer length byteswritable
+ * Variable kmer length byteswritable
  * It was used to generate the graph in which phase the kmer length doesn't change.
  * Thus the size of bytes doesn't change either.
  */
@@ -38,25 +38,16 @@
 
     protected int size;
     protected byte[] bytes;
+    protected int offset;
     protected int kmerlength;
 
     @Deprecated
     public KmerBytesWritable() {
-        this(0, EMPTY_BYTES);
+        this(0, EMPTY_BYTES, 0);
     }
 
-    public KmerBytesWritable(int k, byte[] storage) {
-        this.kmerlength = k;
-        if (k > 0) {
-            this.size = KmerUtil.getByteNumFromK(kmerlength);
-            this.bytes = storage;
-            if (this.bytes.length < size) {
-                throw new ArrayIndexOutOfBoundsException("Storage is smaller than required space for kmerlength:k");
-            }
-        } else {
-            this.bytes = storage;
-            this.size = 0;
-        }
+    public KmerBytesWritable(int k, byte[] storage, int offset) {
+        setNewReference(k, storage, offset);
     }
 
     /**
@@ -73,28 +64,92 @@
         } else {
             this.bytes = EMPTY_BYTES;
         }
+        this.offset = 0;
     }
 
     public KmerBytesWritable(KmerBytesWritable right) {
-        if (right != null) {
-            this.kmerlength = right.kmerlength;
-            this.size = right.size;
-            this.bytes = new byte[right.size];
-            set(right);
-        }else{
-            this.kmerlength = 0;
-            this.size = 0;
-            this.bytes = EMPTY_BYTES;
+        this(right.kmerlength);
+        set(right);
+    }
+
+    public void set(KmerBytesWritable newData) {
+        if (newData == null) {
+            this.set(0, EMPTY_BYTES, 0);
+        } else {
+            this.set(newData.kmerlength, newData.bytes, 0);
+        }
+    }
+
+    public void set(byte[] newData, int offset) {
+        if (kmerlength > 0) {
+            System.arraycopy(newData, offset, bytes, offset, size);
+        }
+    }
+
+    public void set(int k, byte[] newData, int offset) {
+        reset(k);
+        if (k > 0) {
+            System.arraycopy(newData, offset, bytes, this.offset, size);
+        }
+    }
+
+    /**
+     * Reset array by kmerlength
+     * 
+     * @param k
+     */
+    public void reset(int k) {
+        this.kmerlength = k;
+        setSize(KmerUtil.getByteNumFromK(k));
+        clearLeadBit();
+    }
+
+    public void setNewReference(byte[] newData, int offset) {
+        this.bytes = newData;
+        this.offset = offset;
+        if (newData.length - offset < size) {
+            throw new IllegalArgumentException("Not given enough space");
+        }
+    }
+
+    public void setNewReference(int k, byte[] newData, int offset) {
+        this.kmerlength = k;
+        this.size = KmerUtil.getByteNumFromK(k);
+        setNewReference(newData, offset);
+    }
+
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+        this.size = size;
+    }
+
+    protected int getCapacity() {
+        return bytes.length;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap != getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (new_cap < size) {
+                size = new_cap;
+            }
+            if (size != 0) {
+                System.arraycopy(bytes, offset, new_data, 0, size);
+            }
+            bytes = new_data;
+            offset = 0;
         }
     }
 
     public byte getGeneCodeAtPosition(int pos) {
         if (pos >= kmerlength) {
-            return -1;
+            throw new IllegalArgumentException("gene position out of bound");
         }
         int posByte = pos / 4;
         int shift = (pos % 4) << 1;
-        return (byte) ((bytes[size - 1 - posByte] >> shift) & 0x3);
+        return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
     }
 
     public int getKmerLength() {
@@ -106,6 +161,10 @@
         return bytes;
     }
 
+    public int getOffset() {
+        return offset;
+    }
+
     @Override
     public int getLength() {
         return size;
@@ -128,16 +187,21 @@
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
-                bytes[bcount--] = l;
+                bytes[offset + bcount--] = l;
                 l = 0;
                 bytecount = 0;
             }
         }
         if (bcount >= 0) {
-            bytes[0] = l;
+            bytes[offset] = l;
         }
     }
 
+    public void setByRead(int k, byte[] array, int start) {
+        reset(k);
+        setByRead(array, start);
+    }
+
     /**
      * Compress Reversed Kmer into bytes array AATAG will compress as
      * [0x000A,0xATAG]
@@ -156,16 +220,21 @@
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
-                bytes[bcount--] = l;
+                bytes[offset + bcount--] = l;
                 l = 0;
                 bytecount = 0;
             }
         }
         if (bcount >= 0) {
-            bytes[0] = l;
+            bytes[offset] = l;
         }
     }
 
+    public void setByReadReverse(int k, byte[] array, int start) {
+        reset(k);
+        setByReadReverse(array, start);
+    }
+
     /**
      * Shift Kmer to accept new char input
      * 
@@ -185,14 +254,14 @@
      * @return the shift out gene, in gene code format
      */
     public byte shiftKmerWithNextCode(byte c) {
-        byte output = (byte) (bytes[size - 1] & 0x03);
+        byte output = (byte) (bytes[offset + size - 1] & 0x03);
         for (int i = size - 1; i > 0; i--) {
-            byte in = (byte) (bytes[i - 1] & 0x03);
-            bytes[i] = (byte) (((bytes[i] >>> 2) & 0x3f) | (in << 6));
+            byte in = (byte) (bytes[offset + i - 1] & 0x03);
+            bytes[offset + i] = (byte) (((bytes[offset + i] >>> 2) & 0x3f) | (in << 6));
         }
         int pos = ((kmerlength - 1) % 4) << 1;
         byte code = (byte) (c << pos);
-        bytes[0] = (byte) (((bytes[0] >>> 2) & 0x3f) | code);
+        bytes[offset] = (byte) (((bytes[offset] >>> 2) & 0x3f) | code);
         clearLeadBit();
         return output;
     }
@@ -217,34 +286,44 @@
      */
     public byte shiftKmerWithPreCode(byte c) {
         int pos = ((kmerlength - 1) % 4) << 1;
-        byte output = (byte) ((bytes[0] >> pos) & 0x03);
+        byte output = (byte) ((bytes[offset] >> pos) & 0x03);
         for (int i = 0; i < size - 1; i++) {
-            byte in = (byte) ((bytes[i + 1] >> 6) & 0x03);
-            bytes[i] = (byte) ((bytes[i] << 2) | in);
+            byte in = (byte) ((bytes[offset + i + 1] >> 6) & 0x03);
+            bytes[offset + i] = (byte) ((bytes[offset + i] << 2) | in);
         }
-        bytes[size - 1] = (byte) ((bytes[size - 1] << 2) | c);
+        bytes[offset + size - 1] = (byte) ((bytes[offset + size - 1] << 2) | c);
         clearLeadBit();
         return output;
     }
 
+    /**
+     * Merge kmer with next neighbor in gene-code format.
+     * The k of new kmer will increase by 1
+     * e.g. AAGCT merge with A => AAGCTA
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param nextCode
+     *            : next neighbor in gene-code format
+     * @return the merged Kmer, this K of this Kmer is k+1
+     */
+    public void mergeKmerWithNextCode(byte nextCode) {
+        this.kmerlength += 1;
+        setSize(KmerUtil.getByteNumFromK(kmerlength));
+        if (kmerlength % 4 == 1) {
+            for (int i = getLength() - 1; i > 0; i--) {
+                bytes[offset + i] = bytes[offset + i - 1];
+            }
+            bytes[offset] = (byte) (nextCode & 0x3);
+        } else {
+            bytes[offset] = (byte) (bytes[offset] | ((nextCode & 0x3) << (((kmerlength-1) % 4) << 1)));
+        }
+        clearLeadBit();
+    }
+
     protected void clearLeadBit() {
         if (kmerlength % 4 != 0) {
-            bytes[0] &= (1 << ((kmerlength % 4) << 1)) - 1;
-        }
-    }
-
-    public void set(KmerBytesWritable newData) {
-        if (kmerlength != newData.kmerlength){
-            throw new IllegalArgumentException("kmerSize is different, try to use VKmerBytesWritable instead");
-        }
-        if (kmerlength > 0 ){
-            set(newData.bytes, 0, newData.size);
-        }
-    }
-
-    public void set(byte[] newData, int offset, int length) {
-        if (kmerlength > 0){
-            System.arraycopy(newData, offset, bytes, 0, size);
+            bytes[offset] &= (1 << ((kmerlength % 4) << 1)) - 1;
         }
     }
 
@@ -259,8 +338,9 @@
         if (this.kmerlength > 0) {
             if (this.bytes.length < this.size) {
                 this.bytes = new byte[this.size];
+                this.offset = 0;
             }
-            in.readFully(bytes, 0, size);
+            in.readFully(bytes, offset, size);
         }
     }
 
@@ -268,7 +348,7 @@
     public void write(DataOutput out) throws IOException {
         out.writeInt(kmerlength);
         if (kmerlength > 0) {
-            out.write(bytes, 0, size);
+            out.write(bytes, offset, size);
         }
     }
 
@@ -286,7 +366,7 @@
 
     @Override
     public String toString() {
-        return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), 0, this.getLength());
+        return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), offset, this.getLength());
     }
 
     public static class Comparator extends WritableComparator {
@@ -309,5 +389,4 @@
     static { // register this comparator
         WritableComparator.define(KmerBytesWritable.class, new Comparator());
     }
-
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
similarity index 88%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
index c00967f..9d458d2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
@@ -15,11 +15,11 @@
 
 package edu.uci.ics.genomix.type;
 
-public class VKmerBytesWritableFactory {
-    private VKmerBytesWritable kmer;
+public class KmerBytesWritableFactory {
+    private KmerBytesWritable kmer;
 
-    public VKmerBytesWritableFactory(int k) {
-        kmer = new VKmerBytesWritable(k);
+    public KmerBytesWritableFactory(int k) {
+        kmer = new KmerBytesWritable(k);
     }
 
     /**
@@ -30,8 +30,9 @@
      * @param array
      * @param start
      */
-    public VKmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
-        kmer.setByRead(k, array, start);
+    public KmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
+        kmer.reset(k);
+        kmer.setByRead(array, start);
         return kmer;
     }
 
@@ -42,8 +43,9 @@
      * @param array
      * @param start
      */
-    public VKmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
-        kmer.setByReadReverse(k, array, start);
+    public KmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
+        kmer.reset(k);
+        kmer.setByReadReverse(array, start);
         return kmer;
     }
 
@@ -57,7 +59,7 @@
      * @param kmerChain
      * @return LastKmer bytes array
      */
-    public VKmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
+    public KmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
         if (lastK > kmerChain.getKmerLength()) {
             return null;
         }
@@ -93,7 +95,7 @@
      * @param kmerChain
      * @return FirstKmer bytes array
      */
-    public VKmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
+    public KmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
         if (firstK > kmerChain.getKmerLength()) {
             return null;
         }
@@ -117,7 +119,7 @@
         return kmer;
     }
 
-    public VKmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
+    public KmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
         if (startK + kSize > kmerChain.getKmerLength()) {
             return null;
         }
@@ -157,7 +159,7 @@
      *            : next neighbor in gene-code format
      * @return the merged Kmer, this K of this Kmer is k+1
      */
-    public VKmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
+    public KmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
         this.kmer.reset(kmer.getKmerLength() + 1);
         for (int i = 1; i <= kmer.getLength(); i++) {
             this.kmer.getBytes()[this.kmer.getLength() - i] = kmer.getBytes()[kmer.getLength() - i];
@@ -184,7 +186,7 @@
      *            : next neighbor in gene-code format
      * @return the merged Kmer,this K of this Kmer is k+1
      */
-    public VKmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+    public KmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
         this.kmer.reset(kmer.getKmerLength() + 1);
         int byteInMergedKmer = 0;
         if (kmer.getKmerLength() % 4 == 0) {
@@ -213,7 +215,7 @@
      *            : bytes array of next kmer
      * @return merged kmer, the new k is @preK + @nextK
      */
-    public VKmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
+    public KmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
         kmer.reset(preKmer.getKmerLength() + nextKmer.getKmerLength());
         int i = 1;
         for (; i <= preKmer.getLength(); i++) {
@@ -253,7 +255,7 @@
      *            : input genecode
      * @return new created kmer that shifted by afterCode, the K will not change
      */
-    public VKmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
+    public KmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
         this.kmer.set(kmer);
         this.kmer.shiftKmerWithNextCode(afterCode);
         return this.kmer;
@@ -271,7 +273,7 @@
      *            : input genecode
      * @return new created kmer that shifted by preCode, the K will not change
      */
-    public VKmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+    public KmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
         this.kmer.set(kmer);
         this.kmer.shiftKmerWithPreCode(preCode);
         return this.kmer;
@@ -282,7 +284,7 @@
      * 
      * @param kmer
      */
-    public VKmerBytesWritable reverse(final KmerBytesWritable kmer) {
+    public KmerBytesWritable reverse(final KmerBytesWritable kmer) {
         this.kmer.reset(kmer.getKmerLength());
 
         int curPosAtKmer = ((kmer.getKmerLength() - 1) % 4) << 1;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
deleted file mode 100644
index fab7001..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class KmerCountValue implements Writable {
-    private byte adjBitMap;
-    private byte count;
-
-    public KmerCountValue(byte bitmap, byte count) {
-        set(bitmap, count);
-    }
-
-    public KmerCountValue() {
-        adjBitMap = 0;
-        count = 0;
-    }
-
-    @Override
-    public void readFields(DataInput arg0) throws IOException {
-        adjBitMap = arg0.readByte();
-        count = arg0.readByte();
-    }
-
-    @Override
-    public void write(DataOutput arg0) throws IOException {
-        arg0.writeByte(adjBitMap);
-        arg0.writeByte(count);
-    }
-
-    @Override
-    public String toString() {
-        return GeneCode.getSymbolFromBitMap(adjBitMap) + '\t' + String.valueOf(count);
-    }
-
-    public void set(byte bitmap, byte count) {
-        this.adjBitMap = bitmap;
-        this.count = count;
-    }
-
-    public byte getAdjBitMap() {
-        return adjBitMap;
-    }
-
-    public void setAdjBitMap(byte adjBitMap) {
-        this.adjBitMap = adjBitMap;
-    }
-
-    public byte getCount() {
-        return count;
-    }
-}
\ No newline at end of file
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java
deleted file mode 100644
index 0dc24a5..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class Position implements Writable {
-    public int readID;
-    public byte posInRead;
-
-    public Position() {
-        readID = 0;
-        posInRead = 0;
-    }
-
-    public Position(int readID, byte posInRead) {
-        this.readID = readID;
-        this.posInRead = posInRead;
-    }
-
-    public Position(final Position pos) {
-        this.readID = pos.readID;
-        this.posInRead = pos.posInRead;
-    }
-
-    public void set(int readID, byte posInRead) {
-        this.readID = readID;
-        this.posInRead = posInRead;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        readID = in.readInt();
-        posInRead = in.readByte();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(readID);
-        out.writeByte(posInRead);
-    }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
deleted file mode 100644
index abedad6..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.type;
-
-public class VKmerBytesWritable extends KmerBytesWritable {
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-
-    @Deprecated
-    public VKmerBytesWritable() {
-        super();
-    }
-
-    public VKmerBytesWritable(int k, byte[] storage) {
-        super(k, storage);
-    }
-
-    public VKmerBytesWritable(int k) {
-        super(k);
-    }
-
-    public VKmerBytesWritable(KmerBytesWritable other) {
-        super(other);
-    }
-
-    protected void setSize(int size) {
-        if (size > getCapacity()) {
-            setCapacity((size * 3 / 2));
-        }
-        this.size = size;
-    }
-
-    protected int getCapacity() {
-        return bytes.length;
-    }
-
-    protected void setCapacity(int new_cap) {
-        if (new_cap != getCapacity()) {
-            byte[] new_data = new byte[new_cap];
-            if (new_cap < size) {
-                size = new_cap;
-            }
-            if (size != 0) {
-                System.arraycopy(bytes, 0, new_data, 0, size);
-            }
-            bytes = new_data;
-        }
-    }
-
-    /**
-     * Read Kmer from read text into bytes array e.g. AATAG will compress as
-     * [0x000G, 0xATAA]
-     * 
-     * @param k
-     * @param array
-     * @param start
-     */
-    public void setByRead(int k, byte[] array, int start) {
-        reset(k);
-        super.setByRead(array, start);
-    }
-
-    /**
-     * Compress Reversed Kmer into bytes array AATAG will compress as
-     * [0x000A,0xATAG]
-     * 
-     * @param input
-     *            array
-     * @param start
-     *            position
-     */
-    public void setByReadReverse(int k, byte[] array, int start) {
-        reset(k);
-        super.setByReadReverse(array, start);
-    }
-
-    @Override
-    public void set(KmerBytesWritable newData) {
-        if (newData == null){
-            this.set(0,null,0,0);
-        }else{
-            this.set(newData.kmerlength, newData.bytes, 0, newData.size);
-        }
-    }
-
-    public void set(int k, byte[] newData, int offset, int length) {
-        reset(k);
-        if (k > 0 ){
-            System.arraycopy(newData, offset, bytes, 0, size);
-        }
-    }
-
-    /**
-     * Reset array by kmerlength
-     * 
-     * @param k
-     */
-    public void reset(int k) {
-        this.kmerlength = k;
-        setSize(0);
-        setSize(KmerUtil.getByteNumFromK(k));
-        clearLeadBit();
-    }
-
-}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableFactoryTest.java
similarity index 88%
rename from genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
rename to genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableFactoryTest.java
index 7c4c675..54d5926 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableFactoryTest.java
@@ -20,22 +20,12 @@
 
 import edu.uci.ics.genomix.type.GeneCode;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
 
-public class VKmerBytesWritableFactoryTest {
+public class KmerBytesWritableFactoryTest {
     static byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
 
-    VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(8);
-
-    @Test
-    public void TestDegree() {
-        Assert.assertTrue(GeneCode.inDegree((byte) 0xff) == 4);
-        Assert.assertTrue(GeneCode.outDegree((byte) 0xff) == 4);
-        Assert.assertTrue(GeneCode.inDegree((byte) 0x3f) == 2);
-        Assert.assertTrue(GeneCode.outDegree((byte) 0x01) == 1);
-        Assert.assertTrue(GeneCode.inDegree((byte) 0x01) == 0);
-    }
+    KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(8);
 
     @Test
     public void TestGetLastKmer() {
@@ -49,7 +39,7 @@
             lastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
         }
-        VKmerBytesWritable vlastKmer;
+        KmerBytesWritable vlastKmer;
         for (int i = 8; i > 0; i--) {
             vlastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
@@ -70,7 +60,7 @@
             firstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
         }
-        VKmerBytesWritable vfirstKmer;
+        KmerBytesWritable vfirstKmer;
         for (int i = 8; i > 0; i--) {
             vfirstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
             Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
@@ -84,7 +74,7 @@
         KmerBytesWritable kmer = new KmerBytesWritable(9);
         kmer.setByRead(array, 0);
         Assert.assertEquals("AGCTGACCG", kmer.toString());
-        VKmerBytesWritable subKmer;
+        KmerBytesWritable subKmer;
         for (int istart = 0; istart < kmer.getKmerLength() - 1; istart++) {
             for (int isize = 1; isize + istart <= kmer.getKmerLength(); isize++) {
                 subKmer = kmerFactory.getSubKmerFromChain(istart, isize, kmer);
@@ -168,7 +158,7 @@
         KmerBytesWritable kmer5 = new KmerBytesWritable(7);
         kmer5.setByRead(array, 0);
         String text5 = "AGCTGAC";
-        VKmerBytesWritable kmer6 = new VKmerBytesWritable(9);
+        KmerBytesWritable kmer6 = new KmerBytesWritable(9);
         kmer6.setByRead(9, array, 1);
         String text6 = "GCTGACCGT";
         merged = kmerFactory.mergeTwoKmer(kmer5, kmer6);
@@ -188,14 +178,14 @@
 
     @Test
     public void TestShift() {
-        VKmerBytesWritable kmer = new VKmerBytesWritable(kmerFactory.getKmerByRead(9, array, 0));
+        KmerBytesWritable kmer = new KmerBytesWritable(kmerFactory.getKmerByRead(9, array, 0));
         String text = "AGCTGACCG";
         Assert.assertEquals(text, kmer.toString());
 
-        VKmerBytesWritable kmerForward = kmerFactory.shiftKmerWithNextCode(kmer, GeneCode.A);
+        KmerBytesWritable kmerForward = kmerFactory.shiftKmerWithNextCode(kmer, GeneCode.A);
         Assert.assertEquals(text, kmer.toString());
         Assert.assertEquals("GCTGACCGA", kmerForward.toString());
-        VKmerBytesWritable kmerBackward = kmerFactory.shiftKmerWithPreCode(kmer, GeneCode.C);
+        KmerBytesWritable kmerBackward = kmerFactory.shiftKmerWithPreCode(kmer, GeneCode.C);
         Assert.assertEquals(text, kmer.toString());
         Assert.assertEquals("CAGCTGACC", kmerBackward.toString());
 
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
index faee509..a5a4430 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
@@ -90,4 +90,21 @@
         }
     }
 
+    @Test
+    public void TestMergeNext() {
+        KmerBytesWritable kmer = new KmerBytesWritable(9);
+        byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G' };
+        kmer.setByRead(array, 0);
+        Assert.assertEquals("AGCTGACCG", kmer.toString());
+
+        String text = "AGCTGACCG";
+        for (int i = 0; i < 10; i++) {
+            for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+                kmer.mergeKmerWithNextCode(x);
+                text = text + (char) GeneCode.GENE_SYMBOL[x];
+                Assert.assertEquals(text, kmer.toString());
+            }
+        }
+    }
+
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 42638a9..4e99865 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -1,20 +1,29 @@
 package edu.uci.ics.genomix.hyracks.data.primitive;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
-public class NodeReference {
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+
+public class NodeReference implements WritableComparable<NodeReference> {
     private PositionReference nodeID;
     private int countOfKmer;
     private PositionListReference incomingList;
     private PositionListReference outgoingList;
+    private KmerBytesWritable kmer;
 
-    public NodeReference() {
+    public NodeReference(int kmerSize) {
         nodeID = new PositionReference();
         countOfKmer = 0;
         incomingList = new PositionListReference();
         outgoingList = new PositionListReference();
+        kmer = new KmerBytesWritable(kmerSize);
     }
-    
-    public int getCount(){
+
+    public int getCount() {
         return countOfKmer;
     }
 
@@ -57,11 +66,14 @@
         return nodeID;
     }
 
+    public KmerBytesWritable getKmer() {
+        return kmer;
+    }
+
     public void mergeNextWithinOneRead(NodeReference nextNodeEntry) {
-        this.countOfKmer += nextNodeEntry.countOfKmer;
-        for(PositionReference pos : nextNodeEntry.getOutgoingList()){
-            this.outgoingList.append(pos);
-        }
+        this.countOfKmer += 1;
+        this.outgoingList.set(nextNodeEntry.outgoingList);
+        kmer.mergeKmerWithNextCode(nextNodeEntry.kmer.getGeneCodeAtPosition(nextNodeEntry.kmer.getKmerLength() - 1));
     }
 
     public void set(NodeReference node) {
@@ -69,6 +81,34 @@
         this.countOfKmer = node.countOfKmer;
         this.incomingList.set(node.getIncomingList());
         this.outgoingList.set(node.getOutgoingList());
+        this.kmer.set(node.kmer);
     }
 
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.nodeID.readFields(in);
+        this.countOfKmer = in.readInt();
+        this.incomingList.readFields(in);
+        this.outgoingList.readFields(in);
+        this.kmer.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        this.nodeID.write(out);
+        out.writeInt(this.countOfKmer);
+        this.incomingList.write(out);
+        this.outgoingList.write(out);
+        this.kmer.write(out);
+    }
+
+    @Override
+    public int compareTo(NodeReference other) {
+        return this.nodeID.compareTo(other.nodeID);
+    }
+
+    @Override
+    public int hashCode() {
+        return nodeID.hashCode();
+    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 895c644..d4c8f7b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -23,17 +23,11 @@
         this.valueCount = 0;
         this.offset = 0;
     }
-    
-    public PositionListReference(int count, byte [] data, int offset){
+
+    public PositionListReference(int count, byte[] data, int offset) {
         setNewReference(count, data, offset);
     }
 
-    public void setNewReference(int count, byte[] data, int offset){
-        this.valueCount = count;
-        this.storage = data;
-        this.offset = offset;
-    }
-    
     protected void setSize(int size) {
         if (size > getCapacity()) {
             setCapacity((size * 3 / 2));
@@ -48,26 +42,13 @@
         if (new_cap > getCapacity()) {
             byte[] new_data = new byte[new_cap];
             if (storage.length - offset > 0) {
-                System.arraycopy(storage, offset, new_data, 0, storage.length-offset);
+                System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
             }
             storage = new_data;
             offset = 0;
         }
     }
 
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        this.valueCount = in.readInt();
-        setSize(valueCount * PositionReference.LENGTH);
-        in.readFully(storage, offset, valueCount * PositionReference.LENGTH);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(valueCount);
-        out.write(storage, offset, valueCount * PositionReference.LENGTH);
-    }
-
     public PositionReference getPosition(int i) {
         if (i >= valueCount) {
             throw new ArrayIndexOutOfBoundsException("No such positions");
@@ -75,7 +56,7 @@
         posIter.setNewReference(storage, offset + i * PositionReference.LENGTH);
         return posIter;
     }
-    
+
     @Override
     public Iterator<PositionReference> iterator() {
         Iterator<PositionReference> it = new Iterator<PositionReference>() {
@@ -100,6 +81,12 @@
         return it;
     }
 
+    public void setNewReference(int count, byte[] data, int offset) {
+        this.valueCount = count;
+        this.storage = data;
+        this.offset = offset;
+    }
+
     public void set(PositionListReference list2) {
         set(list2.valueCount, list2.storage, list2.offset);
     }
@@ -118,8 +105,8 @@
 
     public void append(PositionReference pos) {
         setSize((1 + valueCount) * PositionReference.LENGTH);
-        System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount * PositionReference.LENGTH,
-                pos.getLength());
+        System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+                * PositionReference.LENGTH, pos.getLength());
         valueCount += 1;
     }
 
@@ -146,7 +133,20 @@
 
     @Override
     public int getLength() {
-        return valueCount * PositionReference.LENGTH ;
+        return valueCount * PositionReference.LENGTH;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.valueCount = in.readInt();
+        setSize(valueCount * PositionReference.LENGTH);
+        in.readFully(storage, offset, valueCount * PositionReference.LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(valueCount);
+        out.write(storage, offset, valueCount * PositionReference.LENGTH);
     }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
index 100b74d..29e894b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -4,12 +4,13 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 
 import edu.uci.ics.hyracks.data.std.api.IValueReference;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
-public class PositionReference implements IValueReference,Writable {
+public class PositionReference implements IValueReference, WritableComparable<PositionReference> {
     private byte[] storage;
     private int offset;
     public static final int LENGTH = 5;
@@ -72,4 +73,51 @@
         out.write(storage, offset, LENGTH);
     }
 
+    @Override
+    public int hashCode() {
+        return this.getReadID();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PositionReference))
+            return false;
+        PositionReference other = (PositionReference) o;
+        return this.getReadID() == other.getReadID() && this.getPosInRead() == other.getPosInRead();
+    }
+
+    @Override
+    public int compareTo(PositionReference other) {
+        int diff = this.getReadID() - other.getReadID();
+        if (diff == 0) {
+            return this.getPosInRead() - other.getPosInRead();
+        }
+        return diff;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + Integer.toString(getReadID()) + "," + Integer.toString((int) getPosInRead()) + ")";
+    }
+
+    /** A Comparator optimized for IntWritable. */
+    public static class Comparator extends WritableComparator {
+        public Comparator() {
+            super(PositionReference.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            int thisValue = IntegerSerializerDeserializer.getInt(b1, s1);
+            int thatValue = IntegerSerializerDeserializer.getInt(b2, s2);
+            int diff = thisValue - thatValue;
+            if (diff == 0){
+                return b1[s1+INTBYTES] - b2[s2+INTBYTES];
+            }
+            return diff;
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(PositionReference.class, new Comparator());
+    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index 4921da1..9d0a3c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -1,8 +1,12 @@
 package edu.uci.ics.genomix.hyracks.dataflow;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionListReference;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -18,15 +22,17 @@
 
 public class MapReadToNodeOperator extends AbstractSingleActivityOperatorDescriptor {
 
-    public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc) {
+    public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
         super(spec, 1, 1);
         recordDescriptors[0] = outRecDesc;
+        this.kmerSize = kmerSize;
     }
 
     /**
      * 
      */
     private static final long serialVersionUID = 1L;
+    private final int kmerSize;
 
     public static final int InputReadIDField = 0;
     public static final int InputInfoFieldStart = 1;
@@ -38,10 +44,11 @@
     public static final int OutputKmerBytesField = 4;
 
     /**
-     * (ReadID, Storage[posInRead]={PositionList,Kmer})
-     * to Position, LengthCount, InComingPosList, OutgoingPosList, Kmer
+     * (ReadID, Storage[posInRead]={len, PositionList, len, Kmer})
+     * to (Position, LengthCount, InComingPosList, OutgoingPosList, Kmer)
      */
     public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        public static final int INT_LENGTH = 4;
         private final IHyracksTaskContext ctx;
         private final RecordDescriptor inputRecDesc;
         private final RecordDescriptor outputRecDesc;
@@ -55,12 +62,12 @@
         private NodeReference nextNodeEntry;
 
         public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
-                RecordDescriptor outputRecDesc) {
+                RecordDescriptor outputRecDesc, int kmerSize) {
             this.ctx = ctx;
             this.inputRecDesc = inputRecDesc;
             this.outputRecDesc = outputRecDesc;
-            curNodeEntry = new NodeReference();
-            nextNodeEntry = new NodeReference();
+            curNodeEntry = new NodeReference(kmerSize);
+            nextNodeEntry = new NodeReference(kmerSize);
         }
 
         @Override
@@ -83,12 +90,16 @@
             }
         }
 
-        private void generateNodeFromRead(int tIndex) {
+        private void generateNodeFromRead(int tIndex) throws HyracksDataException {
             int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
-            resetNode(curNodeEntry, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart));
+            int readID = accessor.getBuffer().getInt(
+                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+            resetNode(curNodeEntry, readID, (byte) 0,
+                    offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), false);
 
             for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
-                setNodeRef(nextNodeEntry, offsetPoslist + accessor.getFieldStartOffset(tIndex, i));
+                resetNode(nextNodeEntry, readID, (byte) (i - InputInfoFieldStart),
+                        offsetPoslist + accessor.getFieldStartOffset(tIndex, i), true);
                 if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
                     curNodeEntry.mergeNextWithinOneRead(nextNodeEntry);
                 } else {
@@ -96,24 +107,77 @@
                     curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
                     outputNode(curNodeEntry);
                     nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
-                    curNodeEntry.set( nextNodeEntry);
+                    curNodeEntry.set(nextNodeEntry);
                 }
             }
             outputNode(curNodeEntry);
         }
 
-        private void outputNode(NodeReference node) {
-            // TODO Auto-generated method stub
-            
+        private void outputNode(NodeReference node) throws HyracksDataException {
+            try {
+                builder.addField(node.getNodeID().getByteArray(), node.getNodeID().getStartOffset(), node.getNodeID()
+                        .getLength());
+                builder.getDataOutput().writeInt(node.getCount());
+                builder.addFieldEndOffset();
+                builder.addField(node.getIncomingList().getByteArray(), node.getIncomingList().getStartOffset(), node
+                        .getIncomingList().getLength());
+                builder.addField(node.getOutgoingList().getByteArray(), node.getOutgoingList().getStartOffset(), node
+                        .getOutgoingList().getLength());
+                builder.addField(node.getKmer().getBytes(), node.getKmer().getOffset(), node.getKmer().getLength());
+
+                if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
+                    if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                        throw new IllegalStateException("Failed to append tuplebuilder to frame");
+                    }
+                }
+                builder.reset();
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
+            }
         }
 
-        private void setNodeRef(NodeReference node, int i) {
-            // TODO Auto-generated method stub
-            
+        private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean byRef) {
+            node.reset();
+            node.setNodeID(readID, posInRead);
+
+            ByteBuffer buffer = accessor.getBuffer();
+            int lengthOfPosition = buffer.getInt(offset);
+            if (lengthOfPosition % PositionReference.LENGTH != 0) {
+                throw new IllegalStateException("Size of PositionList is invalid ");
+            }
+            offset += INT_LENGTH;
+            if (posInRead == 0) {
+                setPositionList(node.getIncomingList(), lengthOfPosition / PositionReference.LENGTH, buffer.array(),
+                        offset, byRef);
+            } else {
+                setPositionList(node.getOutgoingList(), lengthOfPosition / PositionReference.LENGTH, buffer.array(),
+                        offset, byRef);
+            }
+            offset += lengthOfPosition;
+            int lengthKmer = buffer.getInt(offset);
+            if (node.getKmer().getLength() != lengthKmer) {
+                throw new IllegalStateException("Size of Kmer is invalid ");
+            }
+            setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, byRef);
+            node.setCount(1);
         }
 
-        private void resetNode(NodeReference node, int i) {
-            
+        private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean byRef) {
+            if (byRef) {
+                kmer.setNewReference(array, offset);
+            } else {
+                kmer.set(array, offset);
+            }
+        }
+
+        private void setPositionList(PositionListReference list, int count, byte[] array, int offset, boolean byRef) {
+            if (byRef) {
+                list.setNewReference(count, array, offset);
+            } else {
+                list.set(count, array, offset);
+            }
         }
 
         @Override
@@ -136,7 +200,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         // TODO Auto-generated method stub
         return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
-                recordDescriptors[0]);
+                recordDescriptors[0], kmerSize);
     }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 455b5c5..0772eb3 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -26,7 +26,6 @@
 
 import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -54,7 +53,6 @@
         ConfFactory cf;
         Writer writer = null;
 
-        KmerCountValue reEnterCount = new KmerCountValue();
         KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
 
         /**
@@ -66,12 +64,15 @@
                 byte[] kmer = tuple.getFieldData(0);
                 int keyStart = tuple.getFieldStart(0);
                 int keyLength = tuple.getFieldLength(0);
+                if (reEnterKey.getLength() > keyLength){
+                    throw new IllegalArgumentException("Not enough kmer bytes");
+                }
 
                 byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
                 byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
-                reEnterCount.set(bitmap, count);
-                reEnterKey.set(kmer, keyStart, keyLength);
-                writer.append(reEnterKey, reEnterCount);
+//                reEnterCount.set(bitmap, count);
+                reEnterKey.set(kmer, keyStart);
+                writer.append(reEnterKey, null);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -81,7 +82,7 @@
         public void open(DataOutput output) throws HyracksDataException {
             try {
                 writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
-                        KmerCountValue.class, CompressionType.NONE, null);
+                        null, CompressionType.NONE, null);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index b8f99ef..b9b7213 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -42,10 +42,10 @@
         @Override
         public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
             try {
-                kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0), tuple.getFieldLength(0));
+                kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0));
                 output.write(kmer.toString().getBytes());
                 output.writeByte('\t');
-                output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
+//                output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
                 output.writeByte('\t');
                 output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
                 output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index ea5b58f..47fec00 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -230,7 +230,7 @@
         logDebug("Map ReadInfo to Node");
         //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
         RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
-        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec);
+        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec, kmerSize);
         connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
index 240d2ee..adce3f6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
 import edu.uci.ics.genomix.hyracks.util.ByteComparatorFactory;
 import edu.uci.ics.genomix.hyracks.util.StatCountAggregateFactory;
-import edu.uci.ics.genomix.hyracks.util.StatReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.hyracks.util.StatSumAggregateFactory;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
@@ -107,7 +106,7 @@
 
             String[] readSchedule = scheduler.getLocationConstraints(splits);
             return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
-                    new StatReadsKeyValueParserFactory());
+                    null); //new StatReadsKeyValueParserFactory());
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java
deleted file mode 100644
index c01aae1..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.util;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
-            throws HyracksDataException {
-
-        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
-        final ByteBuffer outputBuffer = ctx.allocateFrame();
-        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputAppender.reset(outputBuffer, true);
-
-        return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
-
-            @Override
-            public void open(IFrameWriter writer) throws HyracksDataException {
-                // TODO Auto-generated method stub
-
-            }
-
-            @Override
-            public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
-                    throws HyracksDataException {
-                byte adjMap = value.getAdjBitMap();
-                byte count = value.getCount();
-                InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
-            }
-
-            @Override
-            public void close(IFrameWriter writer) throws HyracksDataException {
-                FrameUtils.flushFrame(outputBuffer, writer);
-            }
-
-            private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
-                try {
-                    tupleBuilder.reset();
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
-
-                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                            tupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outputBuffer, writer);
-                        outputAppender.reset(outputBuffer, true);
-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                                tupleBuilder.getSize())) {
-                            throw new IllegalStateException(
-                                    "Failed to copy an record into a frame: the record size is too large.");
-                        }
-                    }
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-        };
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index ba9aea2..91ec530 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -34,7 +34,6 @@
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,7 +42,6 @@
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
 
 public class JobRunTest {
     private static final String ACTUAL_RESULT_DIR = "actual";
@@ -213,8 +211,8 @@
                 //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                 KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
                         GenomixJob.DEFAULT_KMERLEN));
-                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
+//                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                KmerBytesWritable value = null;
                 while (reader.next(key, value)) {
                     if (key == null || value == null) {
                         break;