Merge branch 'wbiesing/genomix/VKmers' into nanzhang/hyracks_genomix

Conflicts:
	genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
	genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
	genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
	genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
	genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
	genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index c38e35d..df93069 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -26,7 +26,8 @@
 
 import edu.uci.ics.genomix.data.KmerUtil;
 import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+
 
 /**
  * Variable-length kmer which stores its length internally.
@@ -497,6 +498,11 @@
         clearLeadBit();
         saveHeader(lettersInKmer);
     }
+    
+    public void mergeWithFFKmer(int kmerSize, KmerBytesWritable kmer) {
+        // TODO make this more efficient
+        mergeWithFFKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+    }
 
     /**
      * Merge Kmer with the next connected Kmer, when that Kmer needs to be
@@ -538,6 +544,11 @@
         }
         saveHeader(lettersInKmer);
     }
+    
+    public void mergeWithFRKmer(int kmerSize, KmerBytesWritable kmer) {
+        // TODO make this more efficient
+        mergeWithFRKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+    }
 
     /**
      * Merge Kmer with the previous connected Kmer, when that kmer needs to be
@@ -550,10 +561,16 @@
      *            : the previous kmer
      */
     public void mergeWithRFKmer(int initialKmerSize, VKmerBytesWritable preKmer) {
+        // TODO make this more efficient
         VKmerBytesWritable reversed = new VKmerBytesWritable(preKmer.lettersInKmer);
         reversed.setByReadReverse(preKmer.toString().getBytes(), 0);
         mergeWithRRKmer(initialKmerSize, reversed);
     }
+    
+    public void mergeWithRFKmer(int kmerSize, KmerBytesWritable kmer) {
+        // TODO make this more efficient
+        mergeWithRFKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+    }
 
     /**
      * Merge Kmer with the previous connected Kmer e.g. AACAACC merge with
@@ -590,6 +607,11 @@
                 bytes, kmerStartOffset, bytesUsed);
         clearLeadBit();
     }
+    
+    public void mergeWithRRKmer(int kmerSize, KmerBytesWritable kmer) {
+        // TODO make this more efficient
+        mergeWithRRKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+    }
 
     public void mergeWithKmerInDir(byte dir, int initialKmerSize, VKmerBytesWritable kmer) {
         switch (dir & DirectionFlag.DIR_MASK) {
@@ -609,5 +631,16 @@
                 throw new RuntimeException("Direction not recognized: " + dir);
         }
     }
+    public void mergeWithKmerInDir(byte dir, int initialKmerSize, KmerBytesWritable kmer) {
+        // TODO make this more efficient
+        mergeWithKmerInDir(dir, initialKmerSize, new VKmerBytesWritable(kmer.toString()));
+    }
+
+    public KmerBytesWritable asFixedLengthKmer() {
+        if (lettersInKmer != KmerBytesWritable.getKmerLength()) {
+            throw new IllegalArgumentException("VKmer " + this.toString() + " is not of the same length as the fixed length Kmer (" + KmerBytesWritable.getKmerLength() + " )!");
+        }
+        return new KmerBytesWritable(bytes, kmerStartOffset);
+    }
 
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java
new file mode 100644
index 0000000..c3d8a98
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+public class GeneCode {
+    public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
+    /**
+     * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL
+	 * }
+     */
+    public static final byte A = 0;
+    public static final byte C = 1;
+    public static final byte G = 2;
+    public static final byte T = 3;
+
+    public static byte getCodeFromSymbol(byte ch) {
+        byte r = 0;
+        switch (ch) {
+            case 'A':
+            case 'a':
+                r = A;
+                break;
+            case 'C':
+            case 'c':
+                r = C;
+                break;
+            case 'G':
+            case 'g':
+                r = G;
+                break;
+            case 'T':
+            case 't':
+                r = T;
+                break;
+        }
+        return r;
+    }
+    
+    public static byte getPairedGeneCode(byte genecode){
+        if ( genecode < 0 || genecode > 3){
+            throw new IllegalArgumentException("Invalid genecode");
+        }
+        return (byte) (3- genecode);
+    }
+    
+    public static byte getPairedCodeFromSymbol(byte ch){
+        return getPairedGeneCode(getCodeFromSymbol(ch));
+    }
+
+    public static byte getSymbolFromCode(byte code) {
+        if (code > 3 || code < 0 ) {
+            throw new IllegalArgumentException("Invalid genecode");
+        }
+        return GENE_SYMBOL[code];
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java
new file mode 100644
index 0000000..630dbad
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java
@@ -0,0 +1,502 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.KmerUtil;
+
+/**
+ * Variable kmer length byteswritable
+ * It was used to generate the graph in which phase the kmer length doesn't change.
+ * Thus the size of bytes doesn't change either.
+ */
+public class KmerBytesWritable extends BinaryComparable implements Serializable, WritableComparable<BinaryComparable> {
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+    private static final byte[] EMPTY_BYTES = {};
+
+    protected int size;
+    protected byte[] bytes;
+    protected int offset;
+    protected int kmerlength;
+
+    public KmerBytesWritable() {
+        this(0, EMPTY_BYTES, 0);
+    }
+
+    public KmerBytesWritable(int k, byte[] storage, int offset) {
+        setNewReference(k, storage, offset);
+    }
+    
+    public KmerBytesWritable(int k, String kmer) {
+        setNewReference(kmer.length(), kmer.getBytes(), 0);
+    }
+
+    /**
+     * Initial Kmer space by kmerlength
+     * 
+     * @param k
+     *            kmerlength
+     */
+    public KmerBytesWritable(int k) {
+        this.kmerlength = k;
+        this.size = KmerUtil.getByteNumFromK(kmerlength);
+        if (k > 0) {
+            this.bytes = new byte[this.size];
+        } else {
+            this.bytes = EMPTY_BYTES;
+        }
+        this.offset = 0;
+    }
+
+    public KmerBytesWritable(KmerBytesWritable right) {
+        this(right.kmerlength);
+        set(right);
+    }
+
+    /**
+     * Deep copy of the given kmer
+     * 
+     * @param newData
+     */
+    public void set(KmerBytesWritable newData) {
+        if (newData == null) {
+            this.set(0, EMPTY_BYTES, 0);
+        } else {
+            this.set(newData.kmerlength, newData.bytes, newData.getOffset());
+        }
+    }
+
+    /**
+     * Deep copy of the given bytes data
+     * It will not change the kmerlength
+     * 
+     * @param newData
+     * @param offset
+     */
+    public void set(byte[] newData, int offset) {
+        if (kmerlength > 0) {
+            System.arraycopy(newData, offset, bytes, this.offset, size);
+        }
+    }
+
+    /**
+     * Deep copy of the given data, and also set to new kmerlength
+     * 
+     * @param k
+     *            : new kmer length
+     * @param newData
+     *            : data storage
+     * @param offset
+     *            : start offset
+     */
+    public void set(int k, byte[] newData, int offset) {
+        reset(k);
+        if (k > 0) {
+            System.arraycopy(newData, offset, bytes, this.offset, size);
+        }
+    }
+
+    /**
+     * Reset array by kmerlength
+     * 
+     * @param k
+     */
+    public void reset(int k) {
+        this.kmerlength = k;
+        setSize(KmerUtil.getByteNumFromK(k));
+        clearLeadBit();
+    }
+
+    /**
+     * Point this datablock to the given bytes array
+     * It works like the pointer to new datablock.
+     * kmerlength will not change
+     * 
+     * @param newData
+     * @param offset
+     */
+    public void setNewReference(byte[] newData, int offset) {
+        this.bytes = newData;
+        this.offset = offset;
+        if (newData.length - offset < size) {
+            throw new IllegalArgumentException("Not given enough space");
+        }
+    }
+
+    /**
+     * Point this datablock to the given bytes array
+     * It works like the pointer to new datablock.
+     * It also set the new kmerlength
+     * 
+     * @param k
+     * @param newData
+     * @param offset
+     */
+    public void setNewReference(int k, byte[] newData, int offset) {
+        this.kmerlength = k;
+        this.size = KmerUtil.getByteNumFromK(k);
+        setNewReference(newData, offset);
+    }
+
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+        this.size = size;
+    }
+
+    protected int getCapacity() {
+        return bytes.length;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap != getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (new_cap < size) {
+                size = new_cap;
+            }
+            if (size != 0) {
+                System.arraycopy(bytes, offset, new_data, 0, size);
+            }
+            bytes = new_data;
+            offset = 0;
+        }
+    }
+
+    /**
+     * Get one genecode (A|G|C|T) from the given kmer index
+     * e.g. Get the 4th gene of the kmer ACGTA will return T
+     * 
+     * @param pos
+     * @return
+     */
+    public byte getGeneCodeAtPosition(int pos) {
+        if (pos >= kmerlength) {
+            throw new IllegalArgumentException("gene position out of bound");
+        }
+        int posByte = pos / 4;
+        int shift = (pos % 4) << 1;
+        return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
+    }
+
+    public int getKmerLength() {
+        return this.kmerlength;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    @Override
+    public int getLength() {
+        return size;
+    }
+
+    /**
+     * Read Kmer from read text into bytes array e.g. AATAG will compress as
+     * [0x000G, 0xATAA]
+     * 
+     * @param k
+     * @param array
+     * @param start
+     */
+    public void setByRead(byte[] array, int start) {
+        byte l = 0;
+        int bytecount = 0;
+        int bcount = this.size - 1;
+        for (int i = start; i < start + kmerlength && i < array.length; i++) {
+            byte code = GeneCode.getCodeFromSymbol(array[i]);
+            l |= (byte) (code << bytecount);
+            bytecount += 2;
+            if (bytecount == 8) {
+                bytes[offset + bcount--] = l;
+                l = 0;
+                bytecount = 0;
+            }
+        }
+        if (bcount >= 0) {
+            bytes[offset] = l;
+        }
+    }
+
+    public void setByRead(int k, byte[] array, int start) {
+        reset(k);
+        setByRead(array, start);
+    }
+
+    /**
+     * Compress Reversed read into bytes array
+     * e.g. AATAG will paired to CTATT, and then compress as
+     * [0x000T,0xTATC]
+     * 
+     * @param input
+     *            array
+     * @param start
+     *            position
+     */
+    public void setByReadReverse(byte[] array, int start) {
+        byte l = 0;
+        int bytecount = 0;
+        int bcount = size - 1;
+        for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
+            byte code = GeneCode.getPairedCodeFromSymbol(array[i]);
+            l |= (byte) (code << bytecount);
+            bytecount += 2;
+            if (bytecount == 8) {
+                bytes[offset + bcount--] = l;
+                l = 0;
+                bytecount = 0;
+            }
+        }
+        if (bcount >= 0) {
+            bytes[offset] = l;
+        }
+    }
+
+    public void setByReadReverse(int k, byte[] array, int start) {
+        reset(k);
+        setByReadReverse(array, start);
+    }
+
+    /**
+     * Shift Kmer to accept new char input
+     * 
+     * @param c
+     *            Input new gene character
+     * @return the shift out gene, in gene code format
+     */
+    public byte shiftKmerWithNextChar(byte c) {
+        return shiftKmerWithNextCode(GeneCode.getCodeFromSymbol(c));
+    }
+
+    /**
+     * Shift Kmer to accept new gene code
+     * 
+     * @param c
+     *            Input new gene code
+     * @return the shift out gene, in gene code format
+     */
+    public byte shiftKmerWithNextCode(byte c) {
+        byte output = (byte) (bytes[offset + size - 1] & 0x03);
+        for (int i = size - 1; i > 0; i--) {
+            byte in = (byte) (bytes[offset + i - 1] & 0x03);
+            bytes[offset + i] = (byte) (((bytes[offset + i] >>> 2) & 0x3f) | (in << 6));
+        }
+        int pos = ((kmerlength - 1) % 4) << 1;
+        byte code = (byte) (c << pos);
+        bytes[offset] = (byte) (((bytes[offset] >>> 2) & 0x3f) | code);
+        clearLeadBit();
+        return output;
+    }
+
+    /**
+     * Shift Kmer to accept new input char
+     * 
+     * @param c
+     *            Input new gene character
+     * @return the shiftout gene, in gene code format
+     */
+    public byte shiftKmerWithPreChar(byte c) {
+        return shiftKmerWithPreCode(GeneCode.getCodeFromSymbol(c));
+    }
+
+    /**
+     * Shift Kmer to accept new gene code
+     * 
+     * @param c
+     *            Input new gene code
+     * @return the shiftout gene, in gene code format
+     */
+    public byte shiftKmerWithPreCode(byte c) {
+        int pos = ((kmerlength - 1) % 4) << 1;
+        byte output = (byte) ((bytes[offset] >> pos) & 0x03);
+        for (int i = 0; i < size - 1; i++) {
+            byte in = (byte) ((bytes[offset + i + 1] >> 6) & 0x03);
+            bytes[offset + i] = (byte) ((bytes[offset + i] << 2) | in);
+        }
+        bytes[offset + size - 1] = (byte) ((bytes[offset + size - 1] << 2) | c);
+        clearLeadBit();
+        return output;
+    }
+
+    /**
+     * Merge Kmer with the next connected Kmer
+     * e.g. AAGCTAA merge with AACAACC, if the initial kmerSize = 3
+     * then it will return AAGCTAACAACC
+     * 
+     * @param initialKmerSize
+     *            : the initial kmerSize
+     * @param kmer
+     *            : the next kmer
+     */
+    public void mergeNextKmer(int initialKmerSize, KmerBytesWritable kmer) {
+        int preKmerLength = kmerlength;
+        int preSize = size;
+        this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
+        setSize(KmerUtil.getByteNumFromK(kmerlength));
+        for (int i = 1; i <= preSize; i++) {
+            bytes[offset + size - i] = bytes[offset + preSize - i];
+        }
+        for (int k = initialKmerSize - 1; k < kmer.getKmerLength(); k += 4) {
+            byte onebyte = getOneByteFromKmerAtPosition(k, kmer.getBytes(), kmer.getOffset(), kmer.getLength());
+            appendOneByteAtPosition(preKmerLength + k - initialKmerSize + 1, onebyte, bytes, offset, size);
+        }
+        clearLeadBit();
+    }
+
+    /**
+     * Merge Kmer with the previous connected Kmer
+     * e.g. AACAACC merge with AAGCTAA, if the initial kmerSize = 3
+     * then it will return AAGCTAACAACC
+     * 
+     * @param initialKmerSize
+     *            : the initial kmerSize
+     * @param preKmer
+     *            : the previous kmer
+     */
+    public void mergePreKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+        int preKmerLength = kmerlength;
+        int preSize = size;
+        this.kmerlength += preKmer.kmerlength - initialKmerSize + 1;
+        setSize(KmerUtil.getByteNumFromK(kmerlength));
+        byte cacheByte = getOneByteFromKmerAtPosition(0, bytes, offset, preSize);
+
+        // copy prekmer
+        for (int k = 0; k < preKmer.kmerlength - initialKmerSize + 1; k += 4) {
+            byte onebyte = getOneByteFromKmerAtPosition(k, preKmer.bytes, preKmer.offset, preKmer.size);
+            appendOneByteAtPosition(k, onebyte, bytes, offset, size);
+        }
+
+        // copy current kmer
+        int k = 4;
+        for (; k < preKmerLength; k += 4) {
+            byte onebyte = getOneByteFromKmerAtPosition(k, bytes, offset, preSize);
+            appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
+            cacheByte = onebyte;
+        }
+        appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
+        clearLeadBit();
+    }
+
+    public static void appendOneByteAtPosition(int k, byte onebyte, byte[] buffer, int start, int length) {
+        int position = start + length - 1 - k / 4;
+        if (position < start) {
+            throw new IllegalArgumentException("Buffer for kmer storage is invalid");
+        }
+        int shift = ((k) % 4) << 1;
+        int mask = shift == 0 ? 0 : ((1 << shift) - 1);
+
+        buffer[position] = (byte) ((buffer[position] & mask) | ((0xff & onebyte) << shift));
+        if (position > start && shift != 0) {
+            buffer[position - 1] = (byte) ((buffer[position - 1] & (0xff - mask)) | ((byte) ((0xff & onebyte) >> (8 - shift))));
+        }
+    }
+
+    public static byte getOneByteFromKmerAtPosition(int k, byte[] buffer, int start, int length) {
+        int position = start + length - 1 - k / 4;
+        if (position < start) {
+            throw new IllegalArgumentException("Buffer of kmer storage is invalid");
+        }
+        int shift = (k % 4) << 1;
+        byte data = (byte) (((0xff) & buffer[position]) >> shift);
+        if (shift != 0 && position > start) {
+            data |= 0xff & (buffer[position - 1] << (8 - shift));
+        }
+        return data;
+    }
+
+    protected void clearLeadBit() {
+        if (kmerlength % 4 != 0) {
+            bytes[offset] &= (1 << ((kmerlength % 4) << 1)) - 1;
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.kmerlength = in.readInt();
+        this.size = KmerUtil.getByteNumFromK(kmerlength);
+        if (this.kmerlength > 0) {
+            if (this.bytes.length < this.size) {
+                this.bytes = new byte[this.size];
+                this.offset = 0;
+            }
+            in.readFully(bytes, offset, size);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(kmerlength);
+        if (kmerlength > 0) {
+            out.write(bytes, offset, size);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode() * 31 + this.kmerlength;
+    }
+
+    @Override
+    public boolean equals(Object right_obj) {
+        if (right_obj instanceof KmerBytesWritable)
+            return this.kmerlength == ((KmerBytesWritable) right_obj).kmerlength && super.equals(right_obj);
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), offset, this.getLength());
+    }
+
+    public static class Comparator extends WritableComparator {
+        public final int LEAD_BYTES = 4;
+
+        public Comparator() {
+            super(KmerBytesWritable.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            int kmerlength1 = readInt(b1, s1);
+            int kmerlength2 = readInt(b2, s2);
+            if (kmerlength1 == kmerlength2) {
+                return compareBytes(b1, s1 + LEAD_BYTES, l1 - LEAD_BYTES, b2, s2 + LEAD_BYTES, l2 - LEAD_BYTES);
+            }
+            return kmerlength1 - kmerlength2;
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(KmerBytesWritable.class, new Comparator());
+    }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java
new file mode 100644
index 0000000..b0aaebc
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+public class KmerBytesWritableFactory {
+    private KmerBytesWritable kmer;
+
+    public KmerBytesWritableFactory(int k) {
+        kmer = new KmerBytesWritable(k);
+    }
+
+    /**
+     * Read Kmer from read text into bytes array e.g. AATAG will compress as
+     * [0x000G, 0xATAA]
+     * 
+     * @param k
+     * @param array
+     * @param start
+     */
+    public KmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
+        kmer.reset(k);
+        kmer.setByRead(array, start);
+        return kmer;
+    }
+
+    /**
+     * Compress Reversed Kmer into bytes array AATAG will compress as
+     * [0x000A,0xATAG]
+     * 
+     * @param array
+     * @param start
+     */
+    public KmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
+        kmer.reset(k);
+        kmer.setByReadReverse(array, start);
+        return kmer;
+    }
+
+    /**
+     * Get last kmer from kmer-chain.
+     * e.g. kmerChain is AAGCTA, if k =5, it will
+     * return AGCTA
+     * 
+     * @param k
+     * @param kInChain
+     * @param kmerChain
+     * @return LastKmer bytes array
+     */
+    public KmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
+        if (lastK > kmerChain.getKmerLength()) {
+            return null;
+        }
+        if (lastK == kmerChain.getKmerLength()) {
+            kmer.set(kmerChain);
+            return kmer;
+        }
+        kmer.reset(lastK);
+
+        /** from end to start */
+        int byteInChain = kmerChain.getLength() - 1 - (kmerChain.getKmerLength() - lastK) / 4;
+        int posInByteOfChain = ((kmerChain.getKmerLength() - lastK) % 4) << 1; // *2
+        int byteInKmer = kmer.getLength() - 1;
+        for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
+            kmer.getBytes()[byteInKmer] = (byte) ((0xff & kmerChain.getBytes()[byteInChain]) >> posInByteOfChain);
+            kmer.getBytes()[byteInKmer] |= ((kmerChain.getBytes()[byteInChain - 1] << (8 - posInByteOfChain)));
+        }
+
+        /** last kmer byte */
+        if (byteInKmer == 0) {
+            kmer.getBytes()[0] = (byte) ((kmerChain.getBytes()[0] & 0xff) >> posInByteOfChain);
+        }
+        kmer.clearLeadBit();
+        return kmer;
+    }
+
+    /**
+     * Get first kmer from kmer-chain e.g. kmerChain is AAGCTA, if k=5, it will
+     * return AAGCT
+     * 
+     * @param k
+     * @param kInChain
+     * @param kmerChain
+     * @return FirstKmer bytes array
+     */
+    public KmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
+        if (firstK > kmerChain.getKmerLength()) {
+            return null;
+        }
+        if (firstK == kmerChain.getKmerLength()) {
+            kmer.set(kmerChain);
+            return kmer;
+        }
+        kmer.reset(firstK);
+
+        int i = 1;
+        for (; i < kmer.getLength(); i++) {
+            kmer.getBytes()[kmer.getLength() - i] = kmerChain.getBytes()[kmerChain.getLength() - i];
+        }
+        int posInByteOfChain = (firstK % 4) << 1; // *2
+        if (posInByteOfChain == 0) {
+            kmer.getBytes()[0] = kmerChain.getBytes()[kmerChain.getLength() - i];
+        } else {
+            kmer.getBytes()[0] = (byte) (kmerChain.getBytes()[kmerChain.getLength() - i] & ((1 << posInByteOfChain) - 1));
+        }
+        kmer.clearLeadBit();
+        return kmer;
+    }
+
+    public KmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
+        if (startK + kSize > kmerChain.getKmerLength()) {
+            return null;
+        }
+        if (startK == 0 && kSize == kmerChain.getKmerLength()) {
+            kmer.set(kmerChain);
+            return kmer;
+        }
+        kmer.reset(kSize);
+
+        /** from end to start */
+        int byteInChain = kmerChain.getLength() - 1 - startK / 4;
+        int posInByteOfChain = startK % 4 << 1; // *2
+        int byteInKmer = kmer.getLength() - 1;
+        for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
+            kmer.getBytes()[byteInKmer] = (byte) ((0xff & kmerChain.getBytes()[byteInChain]) >> posInByteOfChain);
+            kmer.getBytes()[byteInKmer] |= ((kmerChain.getBytes()[byteInChain - 1] << (8 - posInByteOfChain)));
+        }
+
+        /** last kmer byte */
+        if (byteInKmer == 0) {
+            kmer.getBytes()[0] = (byte) ((kmerChain.getBytes()[0] & 0xff) >> posInByteOfChain);
+        }
+        kmer.clearLeadBit();
+        return kmer;
+    }
+
+    /**
+     * Merge kmer with next neighbor in gene-code format.
+     * The k of new kmer will increase by 1
+     * e.g. AAGCT merge with A => AAGCTA
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param kmer
+     *            : input bytes of kmer
+     * @param nextCode
+     *            : next neighbor in gene-code format
+     * @return the merged Kmer, this K of this Kmer is k+1
+     */
+    public KmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
+        this.kmer.reset(kmer.getKmerLength() + 1);
+        for (int i = 1; i <= kmer.getLength(); i++) {
+            this.kmer.getBytes()[this.kmer.getLength() - i] = kmer.getBytes()[kmer.getLength() - i];
+        }
+        if (this.kmer.getLength() > kmer.getLength()) {
+            this.kmer.getBytes()[0] = (byte) (nextCode & 0x3);
+        } else {
+            this.kmer.getBytes()[0] = (byte) (kmer.getBytes()[0] | ((nextCode & 0x3) << ((kmer.getKmerLength() % 4) << 1)));
+        }
+        this.kmer.clearLeadBit();
+        return this.kmer;
+    }
+
+    /**
+     * Merge kmer with previous neighbor in gene-code format.
+     * The k of new kmer will increase by 1
+     * e.g. AAGCT merge with A => AAAGCT
+     * 
+     * @param k
+     *            :input k of kmer
+     * @param kmer
+     *            : input bytes of kmer
+     * @param preCode
+     *            : next neighbor in gene-code format
+     * @return the merged Kmer,this K of this Kmer is k+1
+     */
+    public KmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+        this.kmer.reset(kmer.getKmerLength() + 1);
+        int byteInMergedKmer = 0;
+        if (kmer.getKmerLength() % 4 == 0) {
+            this.kmer.getBytes()[0] = (byte) ((kmer.getBytes()[0] >> 6) & 0x3);
+            byteInMergedKmer++;
+        }
+        for (int i = 0; i < kmer.getLength() - 1; i++, byteInMergedKmer++) {
+            this.kmer.getBytes()[byteInMergedKmer] = (byte) ((kmer.getBytes()[i] << 2) | ((kmer.getBytes()[i + 1] >> 6) & 0x3));
+        }
+        this.kmer.getBytes()[byteInMergedKmer] = (byte) ((kmer.getBytes()[kmer.getLength() - 1] << 2) | (preCode & 0x3));
+        this.kmer.clearLeadBit();
+        return this.kmer;
+    }
+
+    /**
+     * Merge two kmer to one kmer
+     * e.g. ACTA + ACCGT => ACTAACCGT
+     * 
+     * @param preK
+     *            : previous k of kmer
+     * @param kmerPre
+     *            : bytes array of previous kmer
+     * @param nextK
+     *            : next k of kmer
+     * @param kmerNext
+     *            : bytes array of next kmer
+     * @return merged kmer, the new k is @preK + @nextK
+     */
+    public KmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
+        kmer.reset(preKmer.getKmerLength() + nextKmer.getKmerLength());
+        int i = 1;
+        for (; i <= preKmer.getLength(); i++) {
+            kmer.getBytes()[kmer.getLength() - i] = preKmer.getBytes()[preKmer.getLength() - i];
+        }
+        if (i > 1) {
+            i--;
+        }
+        if (preKmer.getKmerLength() % 4 == 0) {
+            for (int j = 1; j <= nextKmer.getLength(); j++) {
+                kmer.getBytes()[kmer.getLength() - i - j] = nextKmer.getBytes()[nextKmer.getLength() - j];
+            }
+        } else {
+            int posNeedToMove = ((preKmer.getKmerLength() % 4) << 1);
+            kmer.getBytes()[kmer.getLength() - i] |= nextKmer.getBytes()[nextKmer.getLength() - 1] << posNeedToMove;
+            for (int j = 1; j < nextKmer.getLength(); j++) {
+                kmer.getBytes()[kmer.getLength() - i - j] = (byte) (((nextKmer.getBytes()[nextKmer.getLength() - j] & 0xff) >> (8 - posNeedToMove)) | (nextKmer
+                        .getBytes()[nextKmer.getLength() - j - 1] << posNeedToMove));
+            }
+            if (nextKmer.getKmerLength() % 4 == 0 || (nextKmer.getKmerLength() % 4) * 2 + posNeedToMove > 8) {
+                kmer.getBytes()[0] = (byte) ((0xff & nextKmer.getBytes()[0]) >> (8 - posNeedToMove));
+            }
+        }
+        kmer.clearLeadBit();
+        return kmer;
+    }
+
+    /**
+     * Safely shifted the kmer forward without change the input kmer
+     * e.g. AGCGC shift with T => GCGCT
+     * 
+     * @param k
+     *            : kmer length
+     * @param kmer
+     *            : input kmer
+     * @param afterCode
+     *            : input genecode
+     * @return new created kmer that shifted by afterCode, the K will not change
+     */
+    public KmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
+        this.kmer.set(kmer);
+        this.kmer.shiftKmerWithNextCode(afterCode);
+        return this.kmer;
+    }
+
+    /**
+     * Safely shifted the kmer backward without change the input kmer
+     * e.g. AGCGC shift with T => TAGCG
+     * 
+     * @param k
+     *            : kmer length
+     * @param kmer
+     *            : input kmer
+     * @param preCode
+     *            : input genecode
+     * @return new created kmer that shifted by preCode, the K will not change
+     */
+    public KmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+        this.kmer.set(kmer);
+        this.kmer.shiftKmerWithPreCode(preCode);
+        return this.kmer;
+    }
+
+    /**
+     * get the reverse sequence of given kmer
+     * 
+     * @param kmer
+     */
+    public KmerBytesWritable reverse(final KmerBytesWritable kmer) {
+        this.kmer.reset(kmer.getKmerLength());
+
+        int curPosAtKmer = ((kmer.getKmerLength() - 1) % 4) << 1;
+        int curByteAtKmer = 0;
+
+        int curPosAtReverse = 0;
+        int curByteAtReverse = this.kmer.getLength() - 1;
+        this.kmer.getBytes()[curByteAtReverse] = 0;
+        for (int i = 0; i < kmer.getKmerLength(); i++) {
+            byte gene = (byte) ((kmer.getBytes()[curByteAtKmer] >> curPosAtKmer) & 0x03);
+            this.kmer.getBytes()[curByteAtReverse] |= gene << curPosAtReverse;
+            curPosAtReverse += 2;
+            if (curPosAtReverse >= 8) {
+                curPosAtReverse = 0;
+                this.kmer.getBytes()[--curByteAtReverse] = 0;
+            }
+            curPosAtKmer -= 2;
+            if (curPosAtKmer < 0) {
+                curPosAtKmer = 6;
+                curByteAtKmer++;
+            }
+        }
+        this.kmer.clearLeadBit();
+        return this.kmer;
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
similarity index 75%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
index 3cb2216..128bf9f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.oldtype;
+package edu.uci.ics.genomix.velvet.oldtype;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -22,30 +22,17 @@
 
 import org.apache.hadoop.io.WritableComparable;
 
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-
 public class NodeWritable implements WritableComparable<NodeWritable>, Serializable {
     /**
      * 
      */
     private static final long serialVersionUID = 1L;
-    public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
-
-    // merge/update directions
-    public static class DirectionFlag {
-        public static final byte DIR_FF = 0b00 << 0;
-        public static final byte DIR_FR = 0b01 << 0;
-        public static final byte DIR_RF = 0b10 << 0;
-        public static final byte DIR_RR = 0b11 << 0;
-        public static final byte DIR_MASK = 0b11 << 0;
-    }
-
     private PositionWritable nodeID;
     private PositionListWritable forwardForwardList;
     private PositionListWritable forwardReverseList;
     private PositionListWritable reverseForwardList;
     private PositionListWritable reverseReverseList;
-    private VKmerBytesWritable kmer;
+    private KmerBytesWritable kmer;
 
     public NodeWritable() {
         this(21);
@@ -57,7 +44,7 @@
         forwardReverseList = new PositionListWritable();
         reverseForwardList = new PositionListWritable();
         reverseReverseList = new PositionListWritable();
-        kmer = new VKmerBytesWritable();
+        kmer = new KmerBytesWritable(kmerSize);
     }
 
     public NodeWritable(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
@@ -68,17 +55,7 @@
         forwardReverseList.set(FRList);
         reverseForwardList.set(RFList);
         reverseReverseList.set(RRList);
-        kmer.setAsCopy(kmer);
-    }
-
-    public void set(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
-            PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
-        this.nodeID.set(nodeID);
-        this.forwardForwardList.set(FFList);
-        this.forwardReverseList.set(FRList);
-        this.reverseForwardList.set(RFList);
-        this.reverseReverseList.set(RRList);
-        this.kmer.setAsCopy(kmer);
+        kmer.set(kmer);
     }
 
     public void setNodeID(PositionWritable ref) {
@@ -90,7 +67,7 @@
     }
 
     public void setKmer(KmerBytesWritable right) {
-        this.kmer.setAsCopy(right);
+        this.kmer.set(right);
     }
 
     public void reset(int kmerSize) {
@@ -118,21 +95,6 @@
         return reverseReverseList;
     }
 
-    public PositionListWritable getListFromDir(byte dir) {
-        switch (dir & DirectionFlag.DIR_MASK) {
-            case DirectionFlag.DIR_FF:
-                return getFFList();
-            case DirectionFlag.DIR_FR:
-                return getFRList();
-            case DirectionFlag.DIR_RF:
-                return getRFList();
-            case DirectionFlag.DIR_RR:
-                return getRRList();
-            default:
-                throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
-        }
-    }
-
     public PositionWritable getNodeID() {
         return nodeID;
     }
@@ -148,13 +110,13 @@
     public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
         this.forwardForwardList.set(nextNode.forwardForwardList);
         this.forwardReverseList.set(nextNode.forwardReverseList);
-        kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
+        kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
     }
 
     public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
         this.reverseForwardList.set(preNode.reverseForwardList);
         this.reverseReverseList.set(preNode.reverseReverseList);
-        kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
+        kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
     }
 
     public void set(NodeWritable node) {
@@ -163,7 +125,7 @@
         this.forwardReverseList.set(node.forwardReverseList);
         this.reverseForwardList.set(node.reverseForwardList);
         this.reverseReverseList.set(node.reverseReverseList);
-        this.kmer.setAsCopy(node.kmer);
+        this.kmer.set(node.kmer);
     }
 
     @Override
@@ -211,13 +173,13 @@
     @Override
     public String toString() {
         StringBuilder sbuilder = new StringBuilder();
-        sbuilder.append('{');
+        sbuilder.append('(');
         sbuilder.append(nodeID.toString()).append('\t');
         sbuilder.append(forwardForwardList.toString()).append('\t');
         sbuilder.append(forwardReverseList.toString()).append('\t');
         sbuilder.append(reverseForwardList.toString()).append('\t');
         sbuilder.append(reverseReverseList.toString()).append('\t');
-        sbuilder.append(kmer.toString()).append('}');
+        sbuilder.append(kmer.toString()).append(')');
         return sbuilder.toString();
     }
 
@@ -236,8 +198,4 @@
         return inDegree() == 1 && outDegree() == 1;
     }
 
-    public boolean isSimpleOrTerminalPath() {
-        return isPathNode() || (inDegree() == 0 && outDegree() == 1) || (inDegree() == 1 && outDegree() == 0);
-    }
-
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java
new file mode 100644
index 0000000..b6c42c2
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionListWritable implements Writable, Iterable<PositionWritable>, Serializable {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    protected byte[] storage;
+    protected int offset;
+    protected int valueCount;
+    protected static final byte[] EMPTY = {};
+    public static final int INTBYTES = 4;
+    
+    protected PositionWritable posIter = new PositionWritable();
+
+    public PositionListWritable() {
+        this.storage = EMPTY;
+        this.valueCount = 0;
+        this.offset = 0;
+    }
+
+    public PositionListWritable(int count, byte[] data, int offset) {
+        setNewReference(count, data, offset);
+    }
+    
+    public PositionListWritable(List<PositionWritable> posns) {
+        this();
+        for (PositionWritable p : posns) {
+            append(p);
+        }
+    }
+
+    public void setNewReference(int count, byte[] data, int offset) {
+        this.valueCount = count;
+        this.storage = data;
+        this.offset = offset;
+    }
+
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+    }
+
+    protected int getCapacity() {
+        return storage.length - offset;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap > getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (storage.length - offset > 0) {
+                System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
+            }
+            storage = new_data;
+            offset = 0;
+        }
+    }
+
+    public PositionWritable getPosition(int i) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        posIter.setNewReference(storage, offset + i * PositionWritable.LENGTH);
+        return posIter;
+    }
+
+    public void resetPosition(int i, int readID, byte posInRead) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        Marshal.putInt(readID, storage, offset + i * PositionWritable.LENGTH);
+        storage[offset + INTBYTES] = posInRead;
+    }
+    
+    @Override
+    public Iterator<PositionWritable> iterator() {
+        Iterator<PositionWritable> it = new Iterator<PositionWritable>() {
+
+            private int currentIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return currentIndex < valueCount;
+            }
+
+            @Override
+            public PositionWritable next() {
+                return getPosition(currentIndex++);
+            }
+
+            @Override
+            public void remove() {
+            }
+        };
+        return it;
+    }
+
+    public void set(PositionListWritable list2) {
+        set(list2.valueCount, list2.storage, list2.offset);
+    }
+
+    public void set(int valueCount, byte[] newData, int offset) {
+        this.valueCount = valueCount;
+        setSize(valueCount * PositionWritable.LENGTH);
+        if (valueCount > 0) {
+            System.arraycopy(newData, offset, storage, this.offset, valueCount * PositionWritable.LENGTH);
+        }
+    }
+
+    public void reset() {
+        valueCount = 0;
+    }
+
+    public void append(PositionWritable pos) {
+        setSize((1 + valueCount) * PositionWritable.LENGTH);
+        System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+                * PositionWritable.LENGTH, pos.getLength());
+        valueCount += 1;
+    }
+
+    public void append(int readID, byte posInRead) {
+        setSize((1 + valueCount) * PositionWritable.LENGTH);
+        Marshal.putInt(readID, storage, offset + valueCount * PositionWritable.LENGTH);
+        storage[offset + valueCount * PositionWritable.LENGTH + PositionWritable.INTBYTES] = posInRead;
+        valueCount += 1;
+    }
+    
+    public static int getCountByDataLength(int length) {
+        if (length % PositionWritable.LENGTH != 0) {
+            for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+                System.out.println(ste);
+            }
+            throw new IllegalArgumentException("Length of positionlist is invalid");
+        }
+        return length / PositionWritable.LENGTH;
+    }
+
+    public int getCountOfPosition() {
+        return valueCount;
+    }
+
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return valueCount * PositionWritable.LENGTH;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.valueCount = in.readInt();
+        setSize(valueCount * PositionWritable.LENGTH);
+        in.readFully(storage, offset, valueCount * PositionWritable.LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(valueCount);
+        out.write(storage, offset, valueCount * PositionWritable.LENGTH);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('[');
+        for (PositionWritable pos : this) {
+            sbuilder.append(pos.toString());
+            sbuilder.append(',');
+        }
+        if (valueCount > 0) {
+            sbuilder.setCharAt(sbuilder.length() - 1, ']');
+        } else {
+            sbuilder.append(']');
+        }
+        return sbuilder.toString();
+    }
+    
+    @Override
+    public int hashCode() {
+        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PositionListWritable))
+            return false;
+        PositionListWritable other = (PositionListWritable) o;
+        if (this.valueCount != other.valueCount)
+            return false;
+        for (int i=0; i < this.valueCount; i++) {
+                if (!this.getPosition(i).equals(other.getPosition(i)))
+                    return false;
+        }
+        return true;
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java
new file mode 100644
index 0000000..1d509bb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionWritable implements WritableComparable<PositionWritable>, Serializable {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    protected byte[] storage;
+    protected int offset;
+    public static final int LENGTH = 5;
+    public static final int INTBYTES = 4;
+
+    public PositionWritable() {
+        storage = new byte[LENGTH];
+        offset = 0;
+    }
+
+    public PositionWritable(int readID, byte posInRead) {
+        this();
+        set(readID, posInRead);
+    }
+
+    public PositionWritable(byte[] storage, int offset) {
+        setNewReference(storage, offset);
+    }
+
+    public void setNewReference(byte[] storage, int offset) {
+        this.storage = storage;
+        this.offset = offset;
+    }
+
+    public void set(PositionWritable pos) {
+        set(pos.getReadID(), pos.getPosInRead());
+    }
+
+    public void set(int readID, byte posInRead) {
+        Marshal.putInt(readID, storage, offset);
+        storage[offset + INTBYTES] = posInRead;
+    }
+
+    public int getReadID() {
+        return Marshal.getInt(storage, offset);
+    }
+
+    public byte getPosInRead() {
+        return storage[offset + INTBYTES];
+    }
+
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return LENGTH;
+    }
+
+    public boolean isSameReadID(PositionWritable other) {
+        return getReadID() == other.getReadID();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        in.readFully(storage, offset, LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.write(storage, offset, LENGTH);
+    }
+
+    @Override
+    public int hashCode() {
+        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PositionWritable))
+            return false;
+        PositionWritable other = (PositionWritable) o;
+        return this.getReadID() == other.getReadID() && this.getPosInRead() == other.getPosInRead();
+    }
+
+    @Override
+    public int compareTo(PositionWritable other) {
+        int diff1 = this.getReadID() - other.getReadID();
+        if (diff1 == 0) {
+            int diff2 = Math.abs((int) this.getPosInRead()) - Math.abs((int) other.getPosInRead());
+            if (diff2 == 0) {
+                return this.getPosInRead() - other.getPosInRead();
+            }
+            return diff2;
+        }
+        return diff1;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + Integer.toString(getReadID()) + "," + Integer.toString((int) getPosInRead()) + ")";
+    }
+
+    /** A Comparator optimized for IntWritable. */
+    public static class Comparator extends WritableComparator {
+        public Comparator() {
+            super(PositionWritable.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            int thisValue = Marshal.getInt(b1, s1);
+            int thatValue = Marshal.getInt(b2, s2);
+            int diff1 = thisValue - thatValue;
+            if (diff1 == 0) {
+                int diff2 = Math.abs((int) b1[s1 + INTBYTES]) - Math.abs((int) b2[s2 + INTBYTES]);
+                if (diff2 == 0) {
+                    return b1[s1 + INTBYTES] - b2[s2 + INTBYTES];
+                }
+                return diff2;
+            }
+            return diff1;
+        }
+    }
+
+    public static class FirstComparator implements RawComparator<PositionWritable> {
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return WritableComparator.compareBytes(b1, s1, l1 - 1, b2, s2, l2 - 1);
+        }
+
+        @Override
+        public int compare(PositionWritable o1, PositionWritable o2) {
+            int l = o1.getReadID();
+            int r = o2.getReadID();
+            return l == r ? 0 : (l < r ? -1 : 1);
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(PositionWritable.class, new Comparator());
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
deleted file mode 100644
index d14405e..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
-import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
-
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-    private PositionListWritable nodeIdList = new PositionListWritable();
-    private KmerListWritable forwardForwardList = new KmerListWritable(kmerSize);//怎么得到kmersize
-    private KmerListWritable forwardReverseList = new KmerListWritable(kmerSize);
-    private KmerListWritable reverseForwardList = new KmerListWritable(kmerSize);
-    private KmerListWritable reverseReverseList = new KmerListWritable(kmerSize);
-    private KmerBytesWritable kmer  = new KmerBytesWritable(kmerSize);
-    
-    @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
-            throws HyracksDataException {
-        return new IAggregatorDescriptor() {
-            private NodeWritable nodeAggreter = new NodeWritable();// 能否写在外面
-
-            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
-                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
-                return offset;
-            }
-
-            @Override
-            public void reset() {
-            }
-
-            @Override
-            public void close() {
-
-            }
-
-            @Override
-            public AggregateState createAggregateStates() {
-                return new AggregateState(new NodeWritable());
-            }
-
-            @Override
-            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                NodeWritable inputVal = (NodeWritable) state.state;
-                inputVal.reset(kmerSize);
-                nodeIdList.reset();
-                forwardForwardList.reset(kmerSize);
-                forwardReverseList.reset(kmerSize);
-                reverseForwardList.reset(kmerSize);
-                reverseReverseList.reset(kmerSize);
-                
-                kmer.setAsCopy(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
-                nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
-                int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
-                forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
-                int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
-                forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
-                int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
-                reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
-                int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
-                reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
-                nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
-                
-                inputVal.getKmer().setAsCopy(kmer);
-                inputVal.getNodeIdList().appendList(nodeIdList);
-                inputVal.getFFList().appendList(forwardForwardList);
-                inputVal.getFRList().appendList(forwardReverseList);
-                inputVal.getRFList().appendList(reverseForwardList);
-                inputVal.getRRList().appendList(reverseReverseList);
-                
-                // make an empty field
-                tupleBuilder.addFieldEndOffset();///????为啥
-            }
-
-            @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                NodeWritable inputVal = (NodeWritable) state.state;
-                kmer.setAsCopy(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
-                nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
-                int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
-                forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
-                int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
-                forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
-                int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
-                reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
-                int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
-                reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
-                nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
-                
-                inputVal.getKmer().setAsCopy(kmer);
-                inputVal.getNodeIdList().appendList(nodeIdList);
-                inputVal.getFFList().appendList(forwardForwardList);
-                inputVal.getFRList().appendList(forwardReverseList);
-                inputVal.getRFList().appendList(reverseForwardList);
-                inputVal.getRRList().appendList(reverseReverseList);
-            }
-
-            @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                throw new IllegalStateException("partial result method should not be called");
-            }
-
-            @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                DataOutput fieldOutput = tupleBuilder.getDataOutput();
-                NodeWritable inputVal = (NodeWritable) state.state;
-                try {
-//                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
-
-                    tupleBuilder.addFieldEndOffset();// --------------为什么?
-                    //-------------------------------------------------------
-//                    tupleBuilder.reset();
-                    fieldOutput.write(inputVal.getKmer().getBytes(), inputVal.getKmer().getOffset(), inputVal.getKmer().getLength());
-                    
-                    tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
-                            .getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
-                            .getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
-                            .getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
-                            .getLength());
-
-/*                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                            tupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outputBuffer, writer);
-                        outputAppender.reset(outputBuffer, true);
-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                                tupleBuilder.getSize())) {
-                            throw new IllegalStateException(
-                                    "Failed to copy an record into a frame: the record kmerByteSize is too large.");
-                        }
-                    }*/
-
-                } catch (IOException e) {
-                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
-                }
-            }
-
-        };
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
deleted file mode 100644
index 76ca95a..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.net.URL;
-import java.util.EnumSet;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
-import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
-import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
-import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
-import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
-import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
-import edu.uci.ics.genomix.hyracks.job.JobGenUnMerged;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class Driver {
-    public static enum Plan {
-        CHECK_KMERREADER,
-        OUTPUT_KMERHASHTABLE,
-        OUTPUT_MAP_KMER_TO_READ,
-        OUTPUT_GROUPBY_READID,
-        BUILD_DEBRUJIN_GRAPH,
-        BUILD_UNMERGED_GRAPH,
-    }
-
-    private static final String IS_PROFILING = "genomix.driver.profiling";
-    private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
-    private static final Log LOG = LogFactory.getLog(Driver.class);
-    private JobGen jobGen;
-    private boolean profiling;
-
-    private int numPartitionPerMachine;
-
-    private IHyracksClientConnection hcc;
-    private Scheduler scheduler;
-
-    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
-        try {
-            hcc = new HyracksConnection(ipAddress, port);
-            scheduler = new Scheduler(hcc.getNodeControllerInfos());
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-        this.numPartitionPerMachine = numPartitionPerMachine;
-    }
-
-    public void runJob(GenomixJobConf job) throws HyracksException {
-        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
-    }
-
-    public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
-        /** add hadoop configurations */
-        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
-        job.addResource(hadoopCore);
-        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
-        job.addResource(hadoopMapRed);
-        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
-        job.addResource(hadoopHdfs);
-
-        LOG.info("job started");
-        long start = System.currentTimeMillis();
-        long end = start;
-        long time = 0;
-
-        this.profiling = profiling;
-        try {
-            Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
-            LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
-            switch (planChoice) {
-                case BUILD_DEBRUJIN_GRAPH:
-                default:
-                    jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
-                    break;
-                case OUTPUT_KMERHASHTABLE:
-                    jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
-                    break;
-                case OUTPUT_MAP_KMER_TO_READ:
-                    jobGen = new JobGenMapKmerToRead(job, scheduler, ncMap, numPartitionPerMachine);
-                    break;
-                case OUTPUT_GROUPBY_READID:
-                    jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
-                    break;
-                case CHECK_KMERREADER:
-                    jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
-                    break;
-                case BUILD_UNMERGED_GRAPH:
-                    jobGen = new JobGenUnMerged(job, scheduler, ncMap, numPartitionPerMachine);
-            }
-
-            start = System.currentTimeMillis();
-            run(jobGen);
-            end = System.currentTimeMillis();
-            time = end - start;
-            LOG.info("result writing finished " + time + "ms");
-            LOG.info("job finished");
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    private void run(JobGen jobGen) throws Exception {
-        try {
-            JobSpecification createJob = jobGen.generateJob();
-            execute(createJob);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    private void execute(JobSpecification job) throws Exception {
-        job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc
-                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
-    }
-
-    public static void main(String[] args) throws Exception {
-        GenomixJobConf jobConf = new GenomixJobConf();
-        String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
-        if (otherArgs.length < 4) {
-            System.err.println("Need <serverIP> <port> <input> <output>");
-            System.exit(-1);
-        }
-        String ipAddress = otherArgs[0];
-        int port = Integer.parseInt(otherArgs[1]);
-        int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
-        boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
-        // FileInputFormat.setInputPaths(job, otherArgs[2]);
-        {
-            @SuppressWarnings("deprecation")
-            Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
-            jobConf.set("mapred.input.dir", path.toString());
-
-            @SuppressWarnings("deprecation")
-            Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
-            jobConf.set("mapred.output.dir", outputDir.toString());
-        }
-        // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
-        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
-        Driver driver = new Driver(ipAddress, port, numOfDuplicate);
-        driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
-    }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
deleted file mode 100644
index e280a7c..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
-import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
-import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
-import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-@SuppressWarnings("deprecation")
-public class JobGenBrujinGraph extends JobGen {
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-
-    public enum GroupbyType {
-        EXTERNAL,
-        PRECLUSTER,
-        HYBRIDHASH,
-    }
-
-    public enum OutputFormat {
-        TEXT,
-        BINARY,
-    }
-
-    protected ConfFactory hadoopJobConfFactory;
-    protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-    protected String[] ncNodeNames;
-    protected String[] readSchedule;
-
-    protected int readLength;
-    protected int kmerSize;
-    protected int frameLimits;
-    protected int frameSize;
-    protected int tableSize;
-    protected GroupbyType groupbyType;
-    protected OutputFormat outputFormat;
-    protected boolean bGenerateReversedKmer;
-
-    protected void logDebug(String status) {
-        LOG.debug(status + " nc nodes:" + ncNodeNames.length);
-    }
-
-    public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
-            int numPartitionPerMachine) throws HyracksDataException {
-        super(job);
-        String[] nodes = new String[ncMap.size()];
-        ncMap.keySet().toArray(nodes);
-        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
-        for (int i = 0; i < numPartitionPerMachine; i++) {
-            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
-        }
-        initJobConfiguration(scheduler);
-    }
-
-    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
-            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
-            IPointableFactory pointable, RecordDescriptor outRed) {
-        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
-                aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
-                        tableSize), true);
-    }
-
-    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
-            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
-            IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
-            throws HyracksDataException {
-
-        Object[] obj = new Object[3];
-
-        switch (groupbyType) {
-            case EXTERNAL:
-                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
-                        combineRed);
-                obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
-                obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
-                        finalRec);
-                break;
-            case PRECLUSTER:
-            default:
-
-                obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
-                        combineRed);
-                obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
-                obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
-                        finalRec);
-                jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-                break;
-        }
-        return obj;
-    }
-
-    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
-        try {
-            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
-                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
-
-            return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
-                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
-                            kmerSize, bGenerateReversedKmer));
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
-            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
-        jobSpec.connect(conn, preOp, 0, nextOp, 0);
-    }
-
-    public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
-            AbstractOperatorDescriptor readOperator) throws HyracksDataException {
-        int[] keyFields = new int[] { 0 }; // the id of grouped key 
-
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                ReadsKeyValueParserFactory.readKmerOutputRec);
-        connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
-                jobSpec));
-
-        RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null, null, null, null});
-        jobSpec.setFrameSize(frameSize);
-
-        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
-                new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
-                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
-        AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
-        logDebug("LocalKmerGroupby Operator");
-        connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
-
-        logDebug("CrossKmerGroupby Operator");
-        IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
-        AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
-        connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
-        return kmerCrossAggregator;
-    }
-
-/*    public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
-            AbstractOperatorDescriptor kmerCrossAggregator) {
-        // Map (Kmer, {(ReadID,PosInRead),...}) into
-        // (ReadID,PosInRead,{OtherPosition,...},Kmer)
-
-        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
-                MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
-        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
-        return mapKmerToRead;
-    }*/
-
-/*    public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
-            AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
-        int[] keyFields = new int[] { 0 }; // the id of grouped key
-        // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
-                MapKmerPositionToReadOperator.readIDOutputRec);
-        connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
-                jobSpec));
-
-        RecordDescriptor readIDFinalRec = new RecordDescriptor(
-                new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
-        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
-                new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
-                IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
-        AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
-        connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
-
-        logDebug("Group by ReadID merger");
-        IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
-        AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
-        connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
-        return readCrossAggregator;
-    }*/
-
-/*    public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
-            AbstractOperatorDescriptor readCrossAggregator) {
-        // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
-        // OutgoingList, Kmer)
-
-        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
-                MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
-        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
-        return mapEachReadToNode;
-    }
-
-    public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
-            AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
-        // Output Kmer
-        ITupleWriterFactory kmerWriter = null;
-        switch (outputFormat) {
-            case TEXT:
-                kmerWriter = new KMerTextWriterFactory(kmerSize);
-                break;
-            case BINARY:
-            default:
-                kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
-                break;
-        }
-        logDebug("WriteOperator");
-        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
-                hadoopJobConfFactory.getConf(), kmerWriter);
-        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
-        return writeKmerOperator;
-    }
-
-    public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
-            AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
-        ITupleWriterFactory nodeWriter = null;
-        switch (outputFormat) {
-            case TEXT:
-                nodeWriter = new NodeTextWriterFactory(kmerSize);
-                break;
-            case BINARY:
-            default:
-                nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
-                break;
-        }
-        logDebug("WriteOperator");
-        // Output Node
-        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
-                hadoopJobConfFactory.getConf(), nodeWriter);
-        connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
-        return writeNodeOperator;
-    }
-*/
-    @Override
-    public JobSpecification generateJob() throws HyracksException {
-
-        JobSpecification jobSpec = new JobSpecification();
-        logDebug("ReadKmer Operator");
-
-        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
-        logDebug("Group by Kmer");
-        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
-
-        // logDebug("Write kmer to result");
-        // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
-
-//        logDebug("Map Kmer to Read Operator");
-//        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
-
-//        logDebug("Group by Read Operator");
-//        lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
-
-/*        logDebug("Generate final node");
-        lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
-        logDebug("Write node to result");
-        lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);*/
-
-        jobSpec.addRoot(lastOperator);
-        return jobSpec;
-    }
-
-    protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
-        Configuration conf = confFactory.getConf();
-        readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
-        kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
-        if (kmerSize % 2 == 0) {
-            kmerSize--;
-            conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
-        }
-        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
-        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
-        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
-
-        bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
-
-        String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
-            groupbyType = GroupbyType.EXTERNAL;
-        } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
-            groupbyType = GroupbyType.PRECLUSTER;
-        } else {
-            groupbyType = GroupbyType.HYBRIDHASH;
-        }
-
-        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
-        if (output.equalsIgnoreCase("text")) {
-            outputFormat = OutputFormat.TEXT;
-        } else {
-            outputFormat = OutputFormat.BINARY;
-        }
-        try {
-            hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
-            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
-                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
-            readSchedule = scheduler.getLocationConstraints(splits);
-        } catch (IOException ex) {
-            throw new HyracksDataException(ex);
-        }
-
-        LOG.info("Genomix Graph Build Configuration");
-        LOG.info("Kmer:" + kmerSize);
-        LOG.info("Groupby type:" + type);
-        LOG.info("Output format:" + output);
-        LOG.info("Frame limit" + frameLimits);
-        LOG.info("Frame kmerByteSize" + frameSize);
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
deleted file mode 100644
index 3adf8a0..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.nio.ByteBuffer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
-    private static final long serialVersionUID = 1L;
-    private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
-
-    public static final int OutputKmerField = 0;
-    public static final int OutputPosition = 1;
-
-    private final boolean bReversed;
-    private final int readLength;
-    private final int kmerSize;
-
-    public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
-            null, null, null, null, null, null});
-
-    public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
-        bReversed = bGenerateReversed;
-        this.readLength = readlength;
-        this.kmerSize = k;
-    }
-
-    public static enum KmerDir {
-        FORWARD,
-        REVERSE,
-    }
-
-    @Override
-    public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
-        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
-        final ByteBuffer outputBuffer = ctx.allocateFrame();
-        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputAppender.reset(outputBuffer, true);
-
-        return new IKeyValueParser<LongWritable, Text>() {
-
-            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);//
-            private PositionReference pos = new PositionReference();//
-
-            private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
-            private IntermediateNodeWritable outputNode = new IntermediateNodeWritable();
-            private ReadIDWritable readId = new ReadIDWritable();
-            private KmerListWritable kmerList = new KmerListWritable();
-
-            private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(kmerSize);
-            private KmerDir preKmerDir = KmerDir.FORWARD;
-            private KmerDir curKmerDir = KmerDir.FORWARD;
-            private KmerDir nextKmerDir = KmerDir.FORWARD;
-
-            byte mateId = (byte) 0;
-
-            @Override
-            public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
-                String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
-                if (geneLine.length != 2) {
-                    return;
-                }
-                int readID = 0;
-                try {
-                    readID = Integer.parseInt(geneLine[0]);
-                } catch (NumberFormatException e) {
-                    LOG.warn("Invalid data ");
-                    return;
-                }
-
-                Pattern genePattern = Pattern.compile("[AGCT]+");
-                Matcher geneMatcher = genePattern.matcher(geneLine[1]);
-                boolean isValid = geneMatcher.matches();
-                if (isValid) {
-                    if (geneLine[1].length() != readLength) {
-                        LOG.warn("Invalid readlength at: " + readID);
-                        return;
-                    }
-                    SplitReads(readID, geneLine[1].getBytes(), writer);
-                }
-            }
-
-            private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
-/*                
-                if (kmerSize >= array.length) {
-                    return;
-                }
-                kmer.setByRead(array, 0);
-                InsertToFrame(kmer, readID, 1, writer);
-
-                
-                for (int i = kmerSize; i < array.length; i++) {
-                    kmer.shiftKmerWithNextChar(array[i]);
-                    InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
-                }
-
-                if (bReversed) {
-                    kmer.setByReadReverse(array, 0);
-                    InsertToFrame(kmer, readID, -1, writer);
-                    for (int i = kmerSize; i < array.length; i++) {
-                        kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
-                        InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
-                    }
-                }*/
-                ///////////////////////////////////////
-                if (kmerSize >= array.length) {
-                    return;
-                }
-                /** first kmer **/
-                curForwardKmer.setByRead(array, 0);
-                curReverseKmer.setAsCopy(kmerFactory.reverse(curForwardKmer));
-                curKmerDir = curForwardKmer.compareTo(curReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
-                setNextKmer(array[kmerSize]);
-                readId.setAsCopy(mateId, readID);
-                outputNode.setreadId(readId);
-                setEdgeListForNextKmer();
-                switch (curKmerDir) {
-                    case FORWARD:
-                        InsertToFrame(curForwardKmer, outputNode, writer);
-                        break;
-                    case REVERSE:
-                        InsertToFrame(curReverseKmer, outputNode, writer);
-                        break;
-                }
-                /** middle kmer **/
-                for (int i = kmerSize + 1; i < array.length; i++) {
-                    setPreKmerByOldCurKmer();
-                    setCurKmerByOldNextKmer();
-                    setNextKmer(array[i]);
-                    //set value.readId
-                    readId.setAsCopy(mateId, readID);
-                    outputNode.setreadId(readId);
-                    //set value.edgeList
-                    setEdgeListForPreKmer();
-                    setEdgeListForNextKmer();
-                    //output mapper result
-                    switch (curKmerDir) {
-                        case FORWARD:
-                            InsertToFrame(curForwardKmer, outputNode, writer);
-                            break;
-                        case REVERSE:
-                            InsertToFrame(curReverseKmer, outputNode, writer);
-                            break;
-                    }
-                }
-                /** last kmer **/
-                setPreKmerByOldCurKmer();
-                setCurKmerByOldNextKmer();
-                //set value.readId
-                readId.setAsCopy(mateId, readID);
-                outputNode.setreadId(readId);
-                //set value.edgeList
-                setEdgeListForPreKmer();
-                //output mapper result
-                switch (curKmerDir) {
-                    case FORWARD:
-                        InsertToFrame(curForwardKmer, outputNode, writer);
-                        break;
-                    case REVERSE:
-                        InsertToFrame(curReverseKmer, outputNode, writer);
-                        break;
-                }
-            }
-
-            public void setPreKmer(byte preChar){
-                preForwardKmer.setAsCopy(curForwardKmer);
-                preForwardKmer.shiftKmerWithPreChar(preChar);
-                preReverseKmer.setAsCopy(preForwardKmer);
-                preReverseKmer.setAsCopy(kmerFactory.reverse(nextForwardKmer));
-                preKmerDir = preForwardKmer.compareTo(preReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
-            }
-            
-            public void setNextKmer(byte nextChar) {
-                nextForwardKmer.setAsCopy(curForwardKmer);
-                nextForwardKmer.shiftKmerWithNextChar(nextChar);
-                nextReverseKmer.setAsCopy(nextForwardKmer);
-                nextReverseKmer.setAsCopy(kmerFactory.reverse(nextForwardKmer));
-                nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
-            }
-
-            public void setPreKmerByOldCurKmer() {
-                preKmerDir = curKmerDir;
-                preForwardKmer.setAsCopy(curForwardKmer);
-                preReverseKmer.setAsCopy(curReverseKmer);
-            }
-
-            //old nextKmer becomes current curKmer
-            public void setCurKmerByOldNextKmer() {
-                curKmerDir = nextKmerDir;
-                curForwardKmer.setAsCopy(nextForwardKmer);
-                preReverseKmer.setAsCopy(nextReverseKmer);
-            }
-
-            public void setEdgeListForNextKmer() {
-                switch (curKmerDir) {
-                    case FORWARD:
-                        switch (nextKmerDir) {
-                            case FORWARD:
-                                kmerList.reset();
-                                kmerList.append(nextForwardKmer);
-                                outputNode.setFFList(kmerList);
-                                break;
-                            case REVERSE:
-                                kmerList.reset();
-                                kmerList.append(nextReverseKmer);
-                                outputNode.setFRList(kmerList);
-                                break;
-                        }
-                        break;
-                    case REVERSE:
-                        switch (nextKmerDir) {
-                            case FORWARD:
-                                kmerList.reset();
-                                kmerList.append(nextForwardKmer);
-                                outputNode.setRFList(kmerList);
-                                break;
-                            case REVERSE:
-                                kmerList.reset();
-                                kmerList.append(nextReverseKmer);
-                                outputNode.setRRList(kmerList);
-                                break;
-                        }
-                        break;
-                }
-            }
-
-            public void setEdgeListForPreKmer() {
-                switch (curKmerDir) {
-                    case FORWARD:
-                        switch (preKmerDir) {
-                            case FORWARD:
-                                kmerList.reset();
-                                kmerList.append(preForwardKmer);
-                                outputNode.setRRList(kmerList);
-                                break;
-                            case REVERSE:
-                                kmerList.reset();
-                                kmerList.append(preReverseKmer);
-                                outputNode.setRFList(kmerList);
-                                break;
-                        }
-                        break;
-                    case REVERSE:
-                        switch (preKmerDir) {
-                            case FORWARD:
-                                kmerList.reset();
-                                kmerList.append(nextForwardKmer);
-                                outputNode.setFRList(kmerList);
-                                break;
-                            case REVERSE:
-                                kmerList.reset();
-                                kmerList.append(nextReverseKmer);
-                                outputNode.setFFList(kmerList);
-                                break;
-                        }
-                        break;
-                }
-            }
-
-            private void InsertToFrame(KmerBytesWritable kmer, IntermediateNodeWritable Node, IFrameWriter writer) {
-                try {
-                    tupleBuilder.reset();
-                    tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
-                    
-                    tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
-                            .getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
-                            .getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
-                            .getLength());
-                    
-                    tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
-                            .getLength());
-
-                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                            tupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outputBuffer, writer);
-                        outputAppender.reset(outputBuffer, true);
-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                                tupleBuilder.getSize())) {
-                            throw new IllegalStateException(
-                                    "Failed to copy an record into a frame: the record kmerByteSize is too large.");
-                        }
-                    }
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-
-            @Override
-            public void open(IFrameWriter writer) throws HyracksDataException {
-            }
-
-            @Override
-            public void close(IFrameWriter writer) throws HyracksDataException {
-                FrameUtils.flushFrame(outputBuffer, writer);
-            }
-        };
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 8f7a69e..60c0682 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -15,7 +15,7 @@
 
 package edu.uci.ics.genomix.hyracks.data.primitive;
 
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
 
 public class NodeReference extends NodeWritable {
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index f376f71..1827651 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -19,9 +19,11 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -186,7 +188,7 @@
                 throw new IllegalArgumentException("kmer kmerByteSize is invalid");
             }
             offset += INT_LENGTH;
-            kmer.setAsCopy(buffer.array(), offset);
+            kmer.set(buffer.array(), offset);
         }
 
         private void connect(NodeReference curNode, NodeReference nextNode) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 9aea9ad..2134177 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -25,8 +25,8 @@
 import org.apache.hadoop.io.Text;
 
 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.GeneCode;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index aabdb3f..def046b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -25,9 +25,9 @@
 import org.apache.hadoop.mapred.JobConf;
 
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -70,7 +70,7 @@
                 if (reEnterKey.getLength() > tuple.getFieldLength(InputKmerField)) {
                     throw new IllegalArgumentException("Not enough kmer bytes");
                 }
-                reEnterKey.setAsReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
+                reEnterKey.setNewReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
                 int countOfPos = tuple.getFieldLength(InputPositionListField) / PositionWritable.LENGTH;
                 if (tuple.getFieldLength(InputPositionListField) % PositionWritable.LENGTH != 0) {
                     throw new IllegalArgumentException("Invalid count of position byte");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index 91f10be..652a6f2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -17,9 +17,9 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -49,7 +49,7 @@
                 if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
                     throw new IllegalArgumentException("Not enough kmer bytes");
                 }
-                kmer.setAsReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
+                kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
                         tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
                 int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField)
                         / PositionWritable.LENGTH;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 1651d4a..e116ab9 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -27,8 +27,8 @@
 import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -94,7 +94,7 @@
             node.getRRList().setNewReference(tuple.getFieldLength(InputRRField) / PositionWritable.LENGTH,
                     tuple.getFieldData(InputRRField), tuple.getFieldStart(InputRRField));
 
-            node.getKmer().setAsReference(
+            node.getKmer().setNewReference(
                     Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
                             tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
                     tuple.getFieldData(InputKmerBytesField), tuple.getFieldStart(InputKmerBytesField));
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index 3ca4a9d..bc00aa5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -18,8 +18,8 @@
 import java.io.IOException;
 
 import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -69,7 +69,7 @@
                         tuple.getFieldData(NodeSequenceWriterFactory.InputRRField),
                         tuple.getFieldStart(NodeSequenceWriterFactory.InputRRField));
 
-                node.getKmer().setAsReference(
+                node.getKmer().setNewReference(
                         Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
                                 tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
                         tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index 07d16f8..b4b1e73 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -19,8 +19,8 @@
 import java.util.Map;
 
 import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -89,7 +89,7 @@
                                             .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
                                         throw new IllegalArgumentException("Not enough kmer bytes");
                                     }
-                                    kmer.setAsReference(
+                                    kmer.setNewReference(
                                             tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
                                             tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
                                     pos.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputPosition),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index fa5360c..1e78b79 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -19,8 +19,8 @@
 import java.util.Map;
 
 import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -121,7 +121,7 @@
                                                     throw new IllegalArgumentException("kmerlength is invalid");
                                                 }
                                                 fieldOffset += 4;
-                                                kmer.setAsReference(buffer, fieldOffset);
+                                                kmer.setNewReference(buffer, fieldOffset);
                                                 fieldOffset += kmer.getLength();
                                                 kmerString = kmer.toString();
                                             }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 09b1680..8e727959 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -20,8 +20,8 @@
 
 import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -93,7 +93,7 @@
                                                 .getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
                                             throw new IllegalArgumentException("Not enough kmer bytes");
                                         }
-                                        kmer.setAsReference(
+                                        kmer.setNewReference(
                                                 tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
                                                 tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
                                         kmerString = kmer.toString();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..6919e76
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+/**
+ * used by precluster groupby
+ */
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+    private static final long serialVersionUID = 1L;
+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+    @Override
+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+            int[] fanouts) {
+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+            return senderSideMaterializePolicy;
+        } else {
+            return pipeliningPolicy;
+        }
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 22b2ae7..84f9fe0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
+
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -45,21 +46,14 @@
     private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
 
     public static final int OutputKmerField = 0;
-    public static final int outputNodeIdListField = 1;
-    public static final int OutputForwardForwardField = 2;
-    public static final int OutputFFListCountField = 3;
-    public static final int OutputForwardReverseField = 4;
-    public static final int OutputFRListCountField = 5;
-    public static final int OutputReverseForwardField = 6;
-    public static final int OutputRFListCountField = 7;
-    public static final int OutputReverseReverseField = 8;
-    public static final int OutputRRListCountField = 9;
+    public static final int OutputNodeField = 1;
+    
 
     private final int readLength;
     private final int kmerSize;
 
     public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
-            null, null, null, null, null, null, null, null});
+            null});
 
     public ReadsKeyValueParserFactory(int readlength, int k) {
         this.readLength = readlength;
@@ -77,21 +71,22 @@
         final ByteBuffer outputBuffer = ctx.allocateFrame();
         final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
         outputAppender.reset(outputBuffer, true);
-
+        KmerBytesWritable.setGlobalKmerLength(kmerSize);
         return new IKeyValueParser<LongWritable, Text>() {
-
+            
             private PositionWritable nodeId = new PositionWritable();
             private PositionListWritable nodeIdList = new PositionListWritable();
-            private KmerListWritable edgeListForPreKmer = new KmerListWritable(kmerSize);;
-            private KmerListWritable edgeListForNextKmer = new KmerListWritable(kmerSize);;
-            private NodeWritable outputNode = new NodeWritable(kmerSize);
+            private KmerListWritable edgeListForPreKmer = new KmerListWritable();
+            private KmerListWritable edgeListForNextKmer = new KmerListWritable();
+            private NodeWritable outputNode = new NodeWritable();
+//            private NodeWritable outputNode2 = new NodeWritable();
 
-            private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
-            private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
+            private KmerBytesWritable preForwardKmer = new KmerBytesWritable();           
+            private KmerBytesWritable preReverseKmer = new KmerBytesWritable();
+            private KmerBytesWritable curForwardKmer = new KmerBytesWritable();
+            private KmerBytesWritable curReverseKmer = new KmerBytesWritable();
+            private KmerBytesWritable nextForwardKmer = new KmerBytesWritable();
+            private KmerBytesWritable nextReverseKmer = new KmerBytesWritable();
             
             private KmerDir preKmerDir = KmerDir.FORWARD;
             private KmerDir curKmerDir = KmerDir.FORWARD;
@@ -126,37 +121,37 @@
             }
 
             private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
-                /** first kmer */
+                /*first kmer*/
                 if (kmerSize >= array.length) {
                     return;
                 }
-                outputNode.reset(kmerSize);
+                outputNode.reset();
                 curForwardKmer.setByRead(array, 0);
                 curReverseKmer.setByReadReverse(array, 0);
                 curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
                 setNextKmer(array[kmerSize]);
-                setnodeId(mateId, readID, 1);
+                setnodeId(mateId, readID, 0);
                 setEdgeListForNextKmer();
                 writeToFrame(writer);
 
-                /** middle kmer */
+                /*middle kmer*/
                 int i = kmerSize;
                 for (; i < array.length - 1; i++) {
-                    outputNode.reset(kmerSize);
+                    outputNode.reset();
                     setPreKmerByOldCurKmer();
                     setCurKmerByOldNextKmer();
                     setNextKmer(array[i]);
-                    setnodeId(mateId, readID, i - kmerSize + 1);
+                    setnodeId(mateId, readID, 0);//i - kmerSize + 1
                     setEdgeListForPreKmer();
                     setEdgeListForNextKmer();
                     writeToFrame(writer);
                 }
                 
-                /** last kmer */
-                outputNode.reset(kmerSize);
+                /*last kmer*/
+                outputNode.reset();
                 setPreKmerByOldCurKmer();
                 setCurKmerByOldNextKmer();
-                setnodeId(mateId, readID, array.length - kmerSize + 1);
+                setnodeId(mateId, readID, 0);//array.length - kmerSize + 1
                 setEdgeListForPreKmer();
                 writeToFrame(writer);
             }
@@ -202,12 +197,12 @@
                     case FORWARD:
                         switch(preKmerDir){
                             case FORWARD:
-                                edgeListForPreKmer.reset(kmerSize);
+                                edgeListForPreKmer.reset();
                                 edgeListForPreKmer.append(preForwardKmer);
                                 outputNode.setRRList(edgeListForPreKmer);
                                 break;
                             case REVERSE:
-                                edgeListForPreKmer.reset(kmerSize);
+                                edgeListForPreKmer.reset();
                                 edgeListForPreKmer.append(preReverseKmer);
                                 outputNode.setRFList(edgeListForPreKmer);
                                 break;
@@ -216,12 +211,12 @@
                     case REVERSE:
                         switch(preKmerDir){
                             case FORWARD:
-                                edgeListForPreKmer.reset(kmerSize);
+                                edgeListForPreKmer.reset();
                                 edgeListForPreKmer.append(preForwardKmer);
                                 outputNode.setFRList(edgeListForPreKmer);
                                 break;
                             case REVERSE:
-                                edgeListForPreKmer.reset(kmerSize);
+                                edgeListForPreKmer.reset();
                                 edgeListForPreKmer.append(preReverseKmer);
                                 outputNode.setFFList(edgeListForPreKmer);
                                 break;
@@ -235,12 +230,12 @@
                     case FORWARD:
                         switch(nextKmerDir){
                             case FORWARD:
-                                edgeListForNextKmer.reset(kmerSize);
+                                edgeListForNextKmer.reset();
                                 edgeListForNextKmer.append(nextForwardKmer);
                                 outputNode.setFFList(edgeListForNextKmer);
                                 break;
                             case REVERSE:
-                                edgeListForNextKmer.reset(kmerSize);
+                                edgeListForNextKmer.reset();
                                 edgeListForNextKmer.append(nextReverseKmer);
                                 outputNode.setFRList(edgeListForNextKmer);
                                 break;
@@ -249,12 +244,12 @@
                     case REVERSE:
                         switch(nextKmerDir){
                             case FORWARD:
-                                edgeListForNextKmer.reset(kmerSize);
+                                edgeListForNextKmer.reset();
                                 edgeListForNextKmer.append(nextForwardKmer);
                                 outputNode.setRFList(edgeListForNextKmer);
                                 break;
                             case REVERSE:
-                                edgeListForNextKmer.reset(kmerSize);
+                                edgeListForNextKmer.reset();
                                 edgeListForNextKmer.append(nextReverseKmer);
                                 outputNode.setRRList(edgeListForNextKmer);
                                 break;
@@ -267,30 +262,7 @@
                 try {
                     tupleBuilder.reset();
                     tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
-                    
-                    //tupleBuilder.addField(node.getnodeId().getByteArray(), node.getnodeId().getStartOffset(), node.getnodeId().getLength());
-//                    tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
-//                    tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
-//                    tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
-//                    tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
-
-                    tupleBuilder.addField(node.getNodeIdList().getByteArray(), node.getNodeIdList().getStartOffset(), node.getNodeIdList().getLength());
-                    
-                    tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
-                    tupleBuilder.getDataOutput().writeInt(node.getFFList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
-                    tupleBuilder.getDataOutput().writeInt(node.getFRList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
-                    tupleBuilder.getDataOutput().writeInt(node.getRFList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
-                    
-                    tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
-                    tupleBuilder.getDataOutput().writeInt(node.getRRList().getCountOfPosition());
-                    tupleBuilder.addFieldEndOffset();
+                    tupleBuilder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
                     
                     if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                             tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..46fdd0e
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+     * local Aggregate
+     */
+    private static final long serialVersionUID = 1L;
+    private final int kmerSize;
+    
+    public AggregateKmerAggregateFactory(int k) {
+        this.kmerSize = k;
+    }
+    
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        KmerBytesWritable.setGlobalKmerLength(kmerSize);
+        return new IAggregatorDescriptor() {
+            
+            private NodeWritable readNode = new NodeWritable();
+            
+            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+                return offset;
+            }
+
+            @Override
+            public void reset() {
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState(new NodeWritable());
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                NodeWritable localUniNode = (NodeWritable) state.state;
+                localUniNode.reset();
+                readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+                localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+                localUniNode.getFFList().appendList(readNode.getFFList());
+                localUniNode.getFRList().appendList(readNode.getFRList());
+                localUniNode.getRFList().appendList(readNode.getRFList());
+                localUniNode.getRRList().appendList(readNode.getRRList());
+
+                // make an empty field
+                tupleBuilder.addFieldEndOffset();// mark question?
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                NodeWritable localUniNode = (NodeWritable) state.state;
+                readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+                localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+                localUniNode.getFFList().appendList(readNode.getFFList());
+                localUniNode.getFRList().appendList(readNode.getFRList());
+                localUniNode.getRFList().appendList(readNode.getRFList());
+                localUniNode.getRRList().appendList(readNode.getRRList());
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("partial result method should not be called");
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                NodeWritable localUniNode = (NodeWritable) state.state;
+                try {
+                    fieldOutput.write(localUniNode.marshalToByteArray(), 0, localUniNode.getSerializedLength());
+
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
similarity index 60%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index f0f72b9..1ee6cae 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -13,20 +13,19 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.hyracks.contrail.graph;
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators;
 
 import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -35,32 +34,47 @@
 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
-
+    
+    private final int kmerSize;
+    
+    public MergeKmerAggregateFactory(int k) {
+        this.kmerSize = k;
+    }
+    
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
         final int frameSize = ctx.getFrameSize();
+        KmerBytesWritable.setGlobalKmerLength(kmerSize);
         return new IAggregatorDescriptor() {
 
-            private PositionReference position = new PositionReference();
-
+            private NodeWritable readNode = new NodeWritable();
+            
+            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+                return offset;
+            }
+            
             @Override
             public AggregateState createAggregateStates() {
-                return new AggregateState(new ArrayBackedValueStorage());
+                return new AggregateState(new NodeWritable());
             }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
-                inputVal.reset();
-                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
-                for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
-                        1); offset += PositionReference.LENGTH) {
-                    position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
-                    inputVal.append(position);
-                }
+                NodeWritable localUniNode = (NodeWritable) state.state;
+                localUniNode.reset();
+                readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+                localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+                localUniNode.getFFList().unionUpdate(readNode.getFFList());
+                localUniNode.getFRList().unionUpdate(readNode.getFRList());
+                localUniNode.getRFList().unionUpdate(readNode.getRFList());
+                localUniNode.getRRList().unionUpdate(readNode.getRRList());
+                
                 //make a fake feild to cheat caller
                 tupleBuilder.addFieldEndOffset();
             }
@@ -73,13 +87,13 @@
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
-                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
-                for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
-                        1); offset += PositionReference.LENGTH) {
-                    position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
-                    inputVal.append(position);
-                }
+                NodeWritable localUniNode = (NodeWritable) state.state;
+                readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+                localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+                localUniNode.getFFList().unionUpdate(readNode.getFFList());
+                localUniNode.getFRList().unionUpdate(readNode.getFRList());
+                localUniNode.getRFList().unionUpdate(readNode.getRFList());
+                localUniNode.getRRList().unionUpdate(readNode.getRRList());
             }
 
             @Override
@@ -92,12 +106,12 @@
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                NodeWritable localUniNode = (NodeWritable) state.state;
                 try {
-                    if (inputVal.getLength() > frameSize / 2) {
-                        LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
+                    if (localUniNode.getSerializedLength() > frameSize / 2) {
+                        LOG.warn("MergeKmer: output data kmerByteSize is too big: " + localUniNode.getSerializedLength());
                     }
-                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+                    fieldOutput.write(localUniNode.marshalToByteArray(), 0, localUniNode.getSerializedLength());
                     tupleBuilder.addFieldEndOffset();
 
                 } catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
index 6d6e1e6..4602ed2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.hyracks.newgraph.driver;
 
 import java.net.URL;
@@ -6,10 +21,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
 
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGen;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenBrujinGraph;
 import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenCheckReader;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -23,19 +40,21 @@
 
 public class Driver {
     public static enum Plan {
+        BUILD_DEBRUJIN_GRAPH,
         CHECK_KMERREADER,
     }
-    
+
     private static final String IS_PROFILING = "genomix.driver.profiling";
     private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
     private static final Log LOG = LogFactory.getLog(Driver.class);
     private JobGen jobGen;
     private boolean profiling;
+
     private int numPartitionPerMachine;
 
     private IHyracksClientConnection hcc;
     private Scheduler scheduler;
-    
+
     public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
         try {
             hcc = new HyracksConnection(ipAddress, port);
@@ -45,9 +64,9 @@
         }
         this.numPartitionPerMachine = numPartitionPerMachine;
     }
-    
+
     public void runJob(GenomixJobConf job) throws HyracksException {
-        runJob(job, Plan.CHECK_KMERREADER, false);
+        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
     }
 
     public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
@@ -69,8 +88,11 @@
             Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
             LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
             switch (planChoice) {
-                case CHECK_KMERREADER:
+                case BUILD_DEBRUJIN_GRAPH:
                 default:
+                    jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+                case CHECK_KMERREADER:
                     jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
                     break;
             }
@@ -85,7 +107,7 @@
             throw new HyracksException(e);
         }
     }
-    
+
     private void run(JobGen jobGen) throws Exception {
         try {
             JobSpecification createJob = jobGen.generateJob();
@@ -95,11 +117,37 @@
             throw e;
         }
     }
-    
+
     private void execute(JobSpecification job) throws Exception {
         job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc
-                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        JobId jobId = hcc.startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.waitForCompletion(jobId);
     }
+
+    public static void main(String[] args) throws Exception {
+        GenomixJobConf jobConf = new GenomixJobConf();
+        String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+        if (otherArgs.length < 4) {
+            System.err.println("Need <serverIP> <port> <input> <output>");
+            System.exit(-1);
+        }
+        String ipAddress = otherArgs[0];
+        int port = Integer.parseInt(otherArgs[1]);
+        int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+        boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+        // FileInputFormat.setInputPaths(job, otherArgs[2]);
+        {
+            @SuppressWarnings("deprecation")
+            Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+            jobConf.set("mapred.input.dir", path.toString());
+
+            @SuppressWarnings("deprecation")
+            Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+            jobConf.set("mapred.output.dir", outputDir.toString());
+        }
+        // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+        Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+        driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
new file mode 100644
index 0000000..b7b7054
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * Write the node to Text
+     */
+    private static final long serialVersionUID = 1L;
+    private final int kmerSize;
+    public static final int OutputKmerField = ReadsKeyValueParserFactory.OutputKmerField;
+    public static final int outputNodeField = ReadsKeyValueParserFactory.OutputNodeField;
+    
+    public NodeTextWriterFactory(int k) {
+        this.kmerSize = k;
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        KmerBytesWritable.setGlobalKmerLength(kmerSize);
+        return new ITupleWriter() {
+            NodeWritable node = new NodeWritable();
+            
+            @Override
+            public void open(DataOutput output) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                node.setAsReference(tuple.getFieldData(outputNodeField), tuple.getFieldStart(outputNodeField));
+                node.getKmer().reset(kmerSize);
+                node.getKmer().setAsReference(tuple.getFieldData(OutputKmerField), tuple.getFieldStart(OutputKmerField));
+                try {
+                    output.write(node.toString().getBytes());
+                    output.writeByte('\n');
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void close(DataOutput output) throws HyracksDataException {
+
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/GenomixJobConf.java
similarity index 98%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/GenomixJobConf.java
index ecd95d5..b0edf77 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/GenomixJobConf.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.hyracks.contrail.graph;
+package edu.uci.ics.genomix.hyracks.newgraph.job;
 
 import java.io.IOException;
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
similarity index 94%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
index 0ffdef2..9649566 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.hyracks.contrail.graph;
+package edu.uci.ics.genomix.hyracks.newgraph.job;
 
 import java.io.Serializable;
 import java.util.UUID;
@@ -26,7 +26,7 @@
 public abstract class JobGen implements Serializable {
 
     /**
-     * 
+     * generate the jobId
      */
     private static final long serialVersionUID = 1L;
     protected final ConfFactory confFactory;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index abfff00..afc1cf7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -24,18 +24,39 @@
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.io.NodeTextWriterFactory;
+
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 @SuppressWarnings("deprecation")
@@ -68,7 +89,7 @@
     protected int tableSize;
     protected GroupbyType groupbyType;
     protected OutputFormat outputFormat;
-    protected boolean bGenerateReversedKmer;
+
 
     protected void logDebug(String status) {
         LOG.debug(status + " nc nodes:" + ncNodeNames.length);
@@ -85,7 +106,116 @@
         }
         initJobConfiguration(scheduler);
     }
-    
+
+    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+            throws HyracksDataException {
+
+        Object[] obj = new Object[3];
+
+        switch (groupbyType) {
+            case PRECLUSTER:
+            default:
+                obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
+                        combineRed);
+                obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+                obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+                        finalRec);
+                jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+                break;
+        }
+        return obj;
+    }
+
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+            return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+                            kmerSize));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+        jobSpec.connect(conn, preOp, 0, nextOp, 0);
+    }
+
+    public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                ReadsKeyValueParserFactory.readKmerOutputRec);
+        
+        connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
+
+        RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        jobSpec.setFrameSize(frameSize);
+
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(kmerSize),
+                new MergeKmerAggregateFactory(kmerSize), new KmerHashPartitioncomputerFactory(),
+                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+        AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        logDebug("LocalKmerGroupby Operator");
+        connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("CrossKmerGroupby Operator");
+        IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+        return kmerCrossAggregator;
+    }
+
+    public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+        ITupleWriterFactory nodeWriter = null;
+        switch (outputFormat) {
+            case TEXT:
+                nodeWriter = new NodeTextWriterFactory(kmerSize);
+                break;
+        }
+        logDebug("WriteOperator");
+        // Output Node
+        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), nodeWriter);
+        connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return writeNodeOperator;
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        logDebug("Write node to result");
+        lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
+
+        jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
+        return jobSpec;
+    }
+
     protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
         Configuration conf = confFactory.getConf();
         readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
@@ -98,18 +228,11 @@
         tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
         frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
 
-        bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
-
         String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
-            groupbyType = GroupbyType.EXTERNAL;
-        } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
-            groupbyType = GroupbyType.PRECLUSTER;
-        } else {
-            groupbyType = GroupbyType.HYBRIDHASH;
-        }
+        groupbyType = GroupbyType.PRECLUSTER;
 
-        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        
         if (output.equalsIgnoreCase("text")) {
             outputFormat = OutputFormat.TEXT;
         } else {
@@ -131,30 +254,5 @@
         LOG.info("Frame limit" + frameLimits);
         LOG.info("Frame kmerByteSize" + frameSize);
     }
-    
-    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
-        try {
-            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
-                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
 
-            return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
-                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
-                            kmerSize, bGenerateReversedKmer));
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
-            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
-        jobSpec.connect(conn, preOp, 0, nextOp, 0);
-    }
-
-    @Override
-    public JobSpecification generateJob() throws HyracksException {
-        // TODO Auto-generated method stub
-        return null;
-    }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
index 6d64cfd..f512f43 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
@@ -18,12 +18,11 @@
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
+
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,12 +33,16 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class JobGenCheckReader extends JobGenBrujinGraph {
 
+    /**
+     * 
+     */
     private static final long serialVersionUID = 1L;
 
     public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
@@ -62,7 +65,7 @@
 
     public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
             HDFSReadOperatorDescriptor readOperator) throws HyracksException {
-        // Output Kmer
+        
         HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
                 hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
 
@@ -70,11 +73,11 @@
 
                     @Override
                     public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        KmerBytesWritable.setGlobalKmerLength(kmerSize);
                         return new ITupleWriter() {
 
-                            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
-                            private KmerListWritable kmerList = new KmerListWritable();
-                            //private IntermediateNodeWritable intermediateNode = new IntermediateNodeWritable();
+                            private NodeWritable outputNode = new NodeWritable();
+                            private KmerBytesWritable outputKmer = new KmerBytesWritable();
 
                             @Override
                             public void open(DataOutput output) throws HyracksDataException {
@@ -83,36 +86,20 @@
                             @Override
                             public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
                                 try {
-                                    if (kmer.getLength() > tuple
+                                    if (outputKmer.getLength() > tuple
                                             .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
                                         throw new IllegalArgumentException("Not enough kmer bytes");
                                     }
-                                    //kemr
-                                    kmer.setAsReference(
+                                    outputKmer.setAsReference(
                                             tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
                                             tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
-                                    kmerList.setAsReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputNodeIdField), 
-                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeIdField), 
-                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeIdField));
-//                                    //nodeId
-//                                    intermediateNode.getNodeId().setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeIdField), 
-//                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeIdField));
-                                    //FF list
-//                                    intermediateNode.getFFList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputForwardForwardField) / 2 ,
-//                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputForwardForwardField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputForwardForwardField));
-//                                    //FR list
-//                                    intermediateNode.getFRList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputForwardReverseField / kmer.getLength()),
-//                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputForwardReverseField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputForwardReverseField));
-//                                    //RF list
-//                                    intermediateNode.getRFList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputReverseForwardField / kmer.getLength()),
-//                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputReverseForwardField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputReverseForwardField));
-//                                    //RR list
-//                                    intermediateNode.getRRList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputReverseReverseField / kmer.getLength()),
-//                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputReverseReverseField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputReverseReverseField));
-//                                    
-                                    output.write(kmer.toString().getBytes());
+                                    outputNode.setAsReference(
+                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeField),
+                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeField));
+
+                                    output.write(outputKmer.toString().getBytes());
                                     output.writeByte('\t');
-                                    output.write(kmerList.toString().getBytes());
+                                    output.write(outputNode.toString().getBytes());
                                     output.writeByte('\n');
                                 } catch (IOException e) {
                                     throw new HyracksDataException(e);
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index dfae011..25915aa 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -8,7 +8,6 @@
 import java.io.IOException;
 
 import junit.framework.Assert;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -24,16 +23,16 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
 import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver;
 import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.test.TestUtils;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+//import edu.uci.ics.genomix.oldtype.NodeWritable;
 
 @SuppressWarnings("deprecation")
 public class JobRun {
     private static final int KmerSize = 5;
-    private static final int ReadLength = 8;
+    private static final int ReadLength = 6;
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
@@ -41,16 +40,7 @@
     private static final String HDFS_INPUT_PATH = "/webmap";
     private static final String HDFS_OUTPUT_PATH = "/webmap_result";
 
-    private static final String EXPECTED_DIR = "src/test/resources/expected/";
-    private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
-//    private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
-//    private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
-//    private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
-//    private static final String EXPECTED_OUPUT_NODE = EXPECTED_DIR + "result_after_generateNode";
-//    private static final String EXPECTED_UNMERGED = EXPECTED_DIR + "result_unmerged";
-
     private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
-    private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
     private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";;
     private MiniDFSCluster dfsCluster;
     
@@ -63,13 +53,22 @@
     @Test
     public void TestAll() throws Exception {
         TestReader();
+//        TestGroupby();
     }
     
     public void TestReader() throws Exception {
         cleanUpReEntry();
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
         driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, null));
+        dumpResult();
+    }
+    
+    public void TestGroupby() throws Exception {
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        cleanUpReEntry();
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+//        dumpResult();
     }
     
     @Before
@@ -129,54 +128,12 @@
         }
     }
     
-    private boolean checkResults(String expectedPath, int[] poslistField) throws Exception {
-        File dumped = null;
+    private void dumpResult() throws Exception {
         String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
         if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
             FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
                     FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
-            dumped = new File(DUMPED_RESULT);
-        } else {
-
-            FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
-            File filePathTo = new File(CONVERT_RESULT);
-            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
-            for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
-                String partname = "/part-" + i;
-                // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
-                // + partname), FileSystem.getLocal(new Configuration()),
-                // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
-                // false, conf);
-
-                Path path = new Path(HDFS_OUTPUT_PATH + partname);
-                FileSystem dfs = FileSystem.get(conf);
-                if (dfs.getFileStatus(path).getLen() == 0) {
-                    continue;
-                }
-                SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-
-                NodeWritable node = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
-                NullWritable value = NullWritable.get();
-                while (reader.next(node, value)) {
-                    if (node == null) {
-                        break;
-                    }
-                    bw.write(node.toString());
-                    System.out.println(node.toString());
-                    bw.newLine();
-                }
-                reader.close();
-            }
-            bw.close();
-            dumped = new File(CONVERT_RESULT);
-        }
-
-        if (poslistField != null) {
-            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
-        } else {
-            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
-        }
-        return true;
+        } 
     }
     
     @After
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 4ce59a0..51a0d15 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.genomix.hyracks.driver.Driver;
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
 
 @SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
@@ -76,10 +76,10 @@
     @Test
     public void TestAll() throws Exception {
 //        TestReader();
-        TestGroupbyKmer();
+//        TestGroupbyKmer();
 //        TestMapKmerToRead();
 //        TestGroupByReadID();
-//        TestEndToEnd();
+        TestEndToEnd();
 //        TestUnMergedNode();
     }
 
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
index 17770fa..3f1cd5c 100644
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
@@ -1 +1 @@
-1	AATAGAAG
+1	AATAGA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
index 6695abe..4dfff11 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.pregelix.api.io.VertexReader;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
@@ -76,7 +77,8 @@
             vertexValue.setFRList(node.getFRList());
             vertexValue.setRFList(node.getRFList());
             vertexValue.setRRList(node.getRRList());
-            vertexValue.setKmer(getRecordReader().getCurrentKey());
+            // TODO make this more efficient (don't use toString)
+            vertexValue.setKmer(new VKmerBytesWritable(getRecordReader().getCurrentKey().toString()));
             vertexValue.setState(State.IS_NON);
             vertex.setVertexValue(vertexValue);
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
new file mode 100644
index 0000000..0308913
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class P2PathMergeOutputFormat extends
+    BinaryDataCleanVertexOutputFormat<KmerBytesWritable, VertexValueWritable, NullWritable> {
+
+    @Override
+    public VertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
+            TaskAttemptContext context) throws IOException, InterruptedException {
+        @SuppressWarnings("unchecked")
+        RecordWriter<KmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+        return new BinaryLoadGraphVertexWriter(recordWriter);
+    }
+
+    /**
+     * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+     */
+    public static class BinaryLoadGraphVertexWriter extends
+            BinaryVertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> {
+        public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
+                throws IOException, InterruptedException {
+            byte selfFlag = (byte)(vertex.getVertexValue().getState() & State.VERTEX_MASK);
+            if(selfFlag == State.IS_FINAL)
+                getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+        }
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index 29872fa..b19a0cf 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -19,8 +19,8 @@
     }
     
     public AdjacencyListWritable(int kmerSize){
-        forwardList = new KmerListWritable(kmerSize);
-        reverseList = new KmerListWritable(kmerSize);
+        forwardList = new KmerListWritable();
+        reverseList = new KmerListWritable();
     }
 
     public void set(AdjacencyListWritable adjacencyList){
@@ -34,8 +34,8 @@
     }
     
     public void reset(int kmerSize){
-        forwardList.reset(kmerSize);
-        reverseList.reset(kmerSize);
+        forwardList.reset();
+        reverseList.reset();
     }
     
     public int getCountOfPosition(){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index c3e02a0..9fd15dd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -27,7 +27,7 @@
 
     public MergeBubbleMessageWritable() {
         sourceVertexId = new PositionWritable();
-        chainVertexId = new KmerBytesWritable(0);
+        chainVertexId = new KmerBytesWritable();
         neighberNode = new AdjacencyListWritable();
         startVertexId = new PositionWritable();
         message = Message.NON;
@@ -78,7 +78,7 @@
 
     public void reset() {
         checkMessage = 0;
-        chainVertexId.reset(1);
+//        chainVertexId.reset();
         neighberNode.reset();
         message = Message.NON;
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index ea91144..d9177e2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -22,12 +22,13 @@
     private byte flag;
     private boolean isFlip;
     private int kmerlength = 0;
+    private boolean updateMsg = false;
 
     private byte checkMessage;
 
     public MessageWritable() {
         sourceVertexId = new KmerBytesWritable();
-        kmer = new KmerBytesWritable(0);
+        kmer = new KmerBytesWritable();
         neighberNode = new AdjacencyListWritable();
         flag = Message.NON;
         isFlip = false;
@@ -37,7 +38,7 @@
     public MessageWritable(int kmerSize) {
         kmerlength = kmerSize;
         sourceVertexId = new KmerBytesWritable();
-        kmer = new KmerBytesWritable(0);
+        kmer = new KmerBytesWritable();
         neighberNode = new AdjacencyListWritable(kmerSize);
         flag = Message.NON;
         isFlip = false;
@@ -61,6 +62,7 @@
         }
         checkMessage |= CheckMessage.ADJMSG;
         this.flag = msg.getFlag();
+        updateMsg = msg.isUpdateMsg();
     }
 
     public void set(int kmerlength, KmerBytesWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
@@ -82,17 +84,16 @@
     }
 
     public void reset() {
-        checkMessage = 0;
-        kmer.reset(1);
-        neighberNode.reset();
-        flag = Message.NON;
+        reset(0);
     }
     
     public void reset(int kmerSize) {
-        checkMessage = 0;
-        kmer.reset(1);
+        checkMessage = (byte) 0;
+        kmerlength = kmerSize;
+//        kmer.reset();
         neighberNode.reset(kmerSize);
         flag = Message.NON;
+        isFlip = false;
     }
 
     public KmerBytesWritable getSourceVertexId() {
@@ -148,6 +149,15 @@
         this.isFlip = isFlip;
     }
 
+    
+    public boolean isUpdateMsg() {
+        return updateMsg;
+    }
+
+    public void setUpdateMsg(boolean updateMsg) {
+        this.updateMsg = updateMsg;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeInt(kmerlength);
@@ -160,6 +170,7 @@
             neighberNode.write(out);
         out.writeBoolean(isFlip);
         out.writeByte(flag); 
+        out.writeBoolean(updateMsg);
     }
 
     @Override
@@ -175,6 +186,7 @@
             neighberNode.readFields(in);
         isFlip = in.readBoolean();
         flag = in.readByte();
+        updateMsg = in.readBoolean();
     }
 
     @Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index d472fd1..60ad003 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -7,6 +7,7 @@
 import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerListWritable;
 
 public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
@@ -32,15 +33,15 @@
         public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
         public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
         public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
-        public static final byte SHOULD_MERGE_CLEAR = 0b1110011;
+        public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
     }
     
     private PositionListWritable nodeIdList;
     private AdjacencyListWritable incomingList;
     private AdjacencyListWritable outgoingList;
     private byte state;
-    private KmerBytesWritable kmer;
-    private KmerBytesWritable mergeDest;
+    private VKmerBytesWritable kmer;
+    private VKmerBytesWritable mergeDest;
     private int kmerlength = 0;
 
     public VertexValueWritable() {
@@ -53,13 +54,13 @@
         incomingList = new AdjacencyListWritable();
         outgoingList = new AdjacencyListWritable();
         state = State.IS_NON;
-        kmer = new KmerBytesWritable(kmerSize);
-        mergeDest = new KmerBytesWritable(kmerSize);
+        kmer = new VKmerBytesWritable();
+        mergeDest = new VKmerBytesWritable();
     }
 
     public VertexValueWritable(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
             KmerListWritable reverseForwardList, KmerListWritable reverseReverseList,
-            byte state, KmerBytesWritable kmer) {
+            byte state, VKmerBytesWritable kmer) {
         set(nodeIdList, forwardForwardList, forwardReverseList, 
                 reverseForwardList, reverseReverseList,
                 state, kmer);
@@ -67,8 +68,8 @@
     
     public void set(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
             KmerListWritable reverseForwardList, KmerListWritable reverseReverseList, 
-            byte state, KmerBytesWritable kmer) {
-        this.kmerlength = kmer.kmerByteSize;
+            byte state, VKmerBytesWritable kmer) {
+        this.kmerlength = kmer.getKmerLetterLength();
         this.incomingList.setForwardList(reverseForwardList);
         this.incomingList.setReverseList(reverseReverseList);
         this.outgoingList.setForwardList(forwardForwardList);
@@ -149,22 +150,22 @@
     }
 
     public int getLengthOfKmer() {
-        return kmer.getKmerLength();
+        return kmer.getKmerLetterLength();
     }
 
-    public KmerBytesWritable getKmer() {
+    public VKmerBytesWritable getKmer() {
         return kmer;
     }
 
-    public void setKmer(KmerBytesWritable kmer) {
+    public void setKmer(VKmerBytesWritable kmer) {
         this.kmer.setAsCopy(kmer);
     }
     
-    public KmerBytesWritable getMergeDest() {
+    public VKmerBytesWritable getMergeDest() {
         return mergeDest;
     }
 
-    public void setMergeDest(KmerBytesWritable mergeDest) {
+    public void setMergeDest(VKmerBytesWritable mergeDest) {
         this.mergeDest = mergeDest;
     }
     
@@ -180,11 +181,11 @@
     public void reset(int kmerSize) {
         this.kmerlength = kmerSize;
         this.nodeIdList.reset();
-        this.incomingList.getForwardList().reset(kmerSize);
-        this.incomingList.getReverseList().reset(kmerSize);
-        this.outgoingList.getForwardList().reset(kmerSize);
-        this.outgoingList.getReverseList().reset(kmerSize);
-        this.kmer.reset(0);
+        this.incomingList.getForwardList().reset();
+        this.incomingList.getReverseList().reset();
+        this.outgoingList.getForwardList().reset();
+        this.outgoingList.getReverseList().reset();
+//        this.kmer.reset(0);
     }
     
     @Override
@@ -237,6 +238,26 @@
     }
     
     /*
+     * Delete the corresponding edge
+     */
+    public void processDelete(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete){
+        switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+            case MessageFlag.DIR_FF:
+                this.getFFList().remove(nodeToDelete);
+                break;
+            case MessageFlag.DIR_FR:
+                this.getFRList().remove(nodeToDelete);
+                break;
+            case MessageFlag.DIR_RF:
+                this.getRFList().remove(nodeToDelete);
+                break;
+            case MessageFlag.DIR_RR:
+                this.getRRList().remove(nodeToDelete);
+                break;
+        }
+    }
+    
+    /*
      * Process any changes to value.  This is for edge updates
      */
     public void processUpdates(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
index bb60a25..95e070f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -4,7 +4,7 @@
 import java.util.logging.Handler;
 import java.util.logging.LogRecord;
 
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
 
 public class DataLoadLogFormatter extends Formatter {
     private NodeWritable key;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 27e2683..7d6a1b9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -12,11 +12,11 @@
     // Create a DateFormat to format the logger timestamp.
     //
     private long step;
-    private KmerBytesWritable sourceVertexId = new KmerBytesWritable(1);
-    private KmerBytesWritable destVertexId = new KmerBytesWritable(1);
+    private KmerBytesWritable sourceVertexId = new KmerBytesWritable();
+    private KmerBytesWritable destVertexId = new KmerBytesWritable();
     private MessageWritable msg = new MessageWritable();
     private byte state;
-    private KmerBytesWritable mergeChain = new KmerBytesWritable(1);;
+    private KmerBytesWritable mergeChain = new KmerBytesWritable();
     //private boolean testDelete = false;
     /**
      * 0: general operation
@@ -55,11 +55,11 @@
     }
 
     public void reset() {
-        this.sourceVertexId = new KmerBytesWritable(1);
-        this.destVertexId = new KmerBytesWritable(1);
+        this.sourceVertexId = new KmerBytesWritable();
+        this.destVertexId = new KmerBytesWritable();
         this.msg = new MessageWritable();
         this.state = 0;
-        this.mergeChain = new KmerBytesWritable(1);
+        this.mergeChain = new KmerBytesWritable();
     }
 
     public String format(LogRecord record) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 9acba97..1beee03 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -13,6 +13,7 @@
 import edu.uci.ics.genomix.pregelix.type.AdjMessage;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.GeneCode;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 
 /**
@@ -25,8 +26,8 @@
     public static int kmerSize = -1;
     protected int maxIteration = -1;
     
-    protected MessageWritable incomingMsg = null; // = new MessageWritable();
-    protected MessageWritable outgoingMsg = null; // = new MessageWritable();
+    protected MessageWritable incomingMsg = null; 
+    protected MessageWritable outgoingMsg = null; 
     protected KmerBytesWritable destVertexId = new KmerBytesWritable();
     protected Iterator<KmerBytesWritable> posIterator;
     private KmerBytesWritable kmer = new KmerBytesWritable();
@@ -96,10 +97,12 @@
     public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
         if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
             posIterator = value.getFFList().iterator();
+            outFlag &= MessageFlag.DIR_CLEAR;
             outFlag |= MessageFlag.DIR_FF;
             return posIterator.next();
         } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
             posIterator = value.getFRList().iterator();
+            outFlag &= MessageFlag.DIR_CLEAR;
             outFlag |= MessageFlag.DIR_FR;
             return posIterator.next();
         } else {
@@ -111,10 +114,12 @@
     public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
         if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
             posIterator = value.getRFList().iterator();
+            outFlag &= MessageFlag.DIR_CLEAR;
             outFlag |= MessageFlag.DIR_RF;
             return posIterator.next();
         } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
             posIterator = value.getRRList().iterator();
+            outFlag &= MessageFlag.DIR_CLEAR;
             outFlag |= MessageFlag.DIR_RR;
             return posIterator.next();
         } else {
@@ -290,6 +295,7 @@
      * set adjMessage to successor(from predecessor)
      */
     public void setSuccessorAdjMsg(){
+        outFlag &= MessageFlag.DIR_CLEAR;
         if(getVertexValue().getFFList().getLength() > 0)
             outFlag |= MessageFlag.DIR_FF;
         else if(getVertexValue().getFRList().getLength() > 0)
@@ -302,6 +308,7 @@
      * set adjMessage to predecessor(from successor)
      */
     public void setPredecessorAdjMsg(){
+        outFlag &= MessageFlag.DIR_CLEAR;
         if(getVertexValue().getRFList().getLength() > 0)
             outFlag |= MessageFlag.DIR_RF;
         else if(getVertexValue().getRRList().getLength() > 0)
@@ -342,18 +349,38 @@
     }
     
     /**
+     * send update message to neighber for P2
+     * @throws IOException 
+     */
+    public void sendUpdateMsg(){
+        outgoingMsg.setUpdateMsg(true);
+        byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+        switch(meToNeighborDir){
+            case MessageFlag.DIR_FF:
+            case MessageFlag.DIR_FR:
+                sendUpdateMsgToPredecessor();
+                break;
+            case MessageFlag.DIR_RF:
+            case MessageFlag.DIR_RR: 
+                sendUpdateMsgToSuccessor();
+                break;
+        }
+    }
+    
+    /**
      * send merge message to neighber for P2
      * @throws IOException 
      */
     public void sendMergeMsg(){
-        if(selfFlag == MessageFlag.IS_HEAD){
+        outgoingMsg.setUpdateMsg(false);
+        if(selfFlag == State.IS_HEAD){
             byte newState = getVertexValue().getState(); 
             newState &= ~State.IS_HEAD;
             newState |= State.IS_OLDHEAD;
             getVertexValue().setState(newState);
             resetSelfFlag();
-            outFlag |= MessageFlag.IS_HEAD;
-        } else if(selfFlag == MessageFlag.IS_OLDHEAD){
+            outFlag |= MessageFlag.IS_HEAD;  
+        } else if(selfFlag == State.IS_OLDHEAD){
             outFlag |= MessageFlag.IS_OLDHEAD;
             voteToHalt();
         }
@@ -370,7 +397,7 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+                outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
                 sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
                 break;
             case MessageFlag.DIR_RF:
@@ -383,7 +410,47 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+                outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+                sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
+                break; 
+        }
+//        if(headBecomeOldHead)
+//            getVertexValue().processDelete(neighborToMeDir, incomingMsg.getSourceVertexId());
+    }
+    
+    /**
+     * send final merge message to neighber for P2
+     * @throws IOException 
+     */
+    public void sendFinalMergeMsg(){
+        outFlag |= MessageFlag.IS_FINAL;
+        byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+        byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+        switch(neighborToMeDir){
+            case MessageFlag.DIR_FF:
+            case MessageFlag.DIR_FR:
+                setSuccessorAdjMsg();
+                if(ifFlipWithPredecessor())
+                    outgoingMsg.setFlip(true);
+                else
+                    outgoingMsg.setFlip(false);
+                outgoingMsg.setFlag(outFlag);
+                outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+                outgoingMsg.setSourceVertexId(getVertexId());
+                outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+                sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
+                break;
+            case MessageFlag.DIR_RF:
+            case MessageFlag.DIR_RR:
+                setPredecessorAdjMsg();
+                if(ifFilpWithSuccessor())
+                    outgoingMsg.setFlip(true);
+                else
+                    outgoingMsg.setFlip(false);
+                outgoingMsg.setFlag(outFlag);
+                outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+                outgoingMsg.setSourceVertexId(getVertexId());
+                outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
                 sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
                 break; 
         }
@@ -406,7 +473,7 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+                outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
                 sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
                 deleteVertex(getVertexId());
                 break;
@@ -419,7 +486,7 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+                outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
                 sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
                 deleteVertex(getVertexId());
                 break; 
@@ -428,6 +495,7 @@
     
     public void setStateAsMergeWithNext(){
     	byte state = getVertexValue().getState();
+    	state &= State.SHOULD_MERGE_CLEAR;
         state |= State.SHOULD_MERGEWITHNEXT;
         getVertexValue().setState(state);
     }
@@ -445,6 +513,7 @@
     
     public void setStateAsMergeWithPrev(){
         byte state = getVertexValue().getState();
+        state &= State.SHOULD_MERGE_CLEAR;
         state |= State.SHOULD_MERGEWITHPREV;
         getVertexValue().setState(state);
     }
@@ -551,7 +620,7 @@
         byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
         byte neighborToMeDir = mirrorDirection(meToNeighborDir);
 
-        byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
+        byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
         
         getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(), 
                 neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
@@ -559,6 +628,60 @@
     }
     
     /**
+     * final merge and updateAdjList  having parameter for p2
+     */
+    public void processFinalMerge(MessageWritable msg){
+        byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
+        byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+        byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
+        
+        String selfString;
+        String match;
+        String msgString;
+        int index;
+        switch(neighborToMeDir){
+            case MessageFlag.DIR_FF:
+                selfString = getVertexValue().getKmer().toString();
+                match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length()); 
+                msgString = msg.getKmer().toString();
+                index = msgString.indexOf(match);
+//                kmer.reset(msgString.length() - index);
+                kmer.setByRead(msgString.substring(index).getBytes(), 0);
+                break;
+            case MessageFlag.DIR_FR:
+                selfString = getVertexValue().getKmer().toString();
+                match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length()); 
+                msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+                index = msgString.indexOf(match);
+//                kmer.reset(msgString.length() - index);
+                kmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
+                break;
+            case MessageFlag.DIR_RF:
+                selfString = getVertexValue().getKmer().toString();
+                match = selfString.substring(0,kmerSize - 1); 
+                msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+                index = msgString.lastIndexOf(match) + kmerSize - 2;
+//                kmer.reset(index + 1);
+                kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+                break;
+            case MessageFlag.DIR_RR:
+                selfString = getVertexValue().getKmer().toString();
+                match = selfString.substring(0,kmerSize - 1); 
+                msgString = msg.getKmer().toString();
+                index = msgString.lastIndexOf(match) + kmerSize - 2;
+//                kmer.reset(index + 1);
+                kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+                System.out.println(kmer.toString());
+                break;
+        }
+       
+        getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(), 
+                neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
+                kmerSize, kmer);
+    }
+    
+    /**
      * set head state
      */
     public void setHeadState(){
@@ -579,12 +702,12 @@
     }
     
     /**
-     * set final state
+     * set stop flag
      */
     public void setStopFlag(){
-        byte state = incomingMsg.getFlag();
+        byte state = getVertexValue().getState();
         state &= State.VERTEX_CLEAR;
-        state |= State.IS_STOP;
+        state |= State.IS_FINAL;
         getVertexValue().setState(state);
     }
     
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index feb22aa..b6f2164 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -4,14 +4,15 @@
 import java.util.Iterator;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
 import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 /*
  * vertexId: BytesWritable
  * vertexValue: VertexValueWritable
@@ -44,7 +45,7 @@
     BasicPathMergeVertex {
 
     private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
-    PositionWritable tempPostition = new PositionWritable();
+    KmerBytesWritable tmpKmer = new KmerBytesWritable();
 
     /**
      * initiate kmerSize, maxIteration
@@ -54,9 +55,14 @@
             kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
         if (maxIteration < 0)
             maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
-        headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
-        selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
-        outgoingMsg.reset();
+        headFlag = (byte)(getVertexValue().getState() & State.IS_HEAD);
+        selfFlag = (byte)(getVertexValue().getState() & State.VERTEX_MASK);
+        if(incomingMsg == null)
+            incomingMsg = new MessageWritable(kmerSize);
+        if(outgoingMsg == null)
+            outgoingMsg = new MessageWritable(kmerSize);
+        else
+            outgoingMsg.reset(kmerSize);
         receivedMsgList.clear();
     }
 
@@ -65,18 +71,18 @@
      */
     public void sendOutMsg() {
         //send wantToMerge to next
-        tempPostition = getNextDestVertexIdAndSetFlag(getVertexValue());
-        if(tempPostition != null){
-            destVertexId.setAsCopy(tempPostition);
+        tmpKmer = getNextDestVertexIdAndSetFlag(getVertexValue());
+        if(tmpKmer != null){
+            destVertexId.setAsCopy(tmpKmer);
             outgoingMsg.setFlag(outFlag);
             outgoingMsg.setSourceVertexId(getVertexId());
             sendMsg(destVertexId, outgoingMsg);
         }
         
-        ////send wantToMerge to prev
-        tempPostition = getPreDestVertexIdAndSetFlag(getVertexValue());
-        if(tempPostition != null){
-            destVertexId.setAsCopy(tempPostition);
+        //send wantToMerge to prev
+        tmpKmer = getPreDestVertexIdAndSetFlag(getVertexValue());
+        if(tmpKmer != null){
+            destVertexId.setAsCopy(tmpKmer);
             outgoingMsg.setFlag(outFlag);
             outgoingMsg.setSourceVertexId(getVertexId());
             sendMsg(destVertexId, outgoingMsg);
@@ -117,7 +123,7 @@
      */
     public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
         //send out wantToMerge msg
-        if(selfFlag != MessageFlag.IS_HEAD){
+        if(selfFlag != State.IS_HEAD && selfFlag != State.IS_OLDHEAD){
                 sendOutMsg();
         }
     }
@@ -126,17 +132,19 @@
      * path response message to head
      */
     public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
-        if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
-            getVertexValue().setState(MessageFlag.IS_STOP);
+        if(!msgIterator.hasNext() && selfFlag == State.IS_HEAD){
+            outFlag |= MessageFlag.IS_FINAL;
             sendOutMsg();
         }
         while (msgIterator.hasNext()) {
             incomingMsg = msgIterator.next();
             if(getMsgFlag() == MessageFlag.IS_FINAL){
                 processMerge(incomingMsg);
-                getVertexValue().setState(MessageFlag.IS_FINAL);
-            }else
+                getVertexValue().setState(State.IS_FINAL);
+            }else{
+                sendUpdateMsg();
                 sendMergeMsg();
+            }
         }
     }
 
@@ -148,11 +156,13 @@
         while (msgIterator.hasNext()) {
             incomingMsg = msgIterator.next();
             if(getMsgFlag() == MessageFlag.IS_FINAL){
-                setStopFlag();
-                sendMergeMsg();
+                sendFinalMergeMsg();
                 break;
             }
-            receivedMsgList.add(incomingMsg);
+            if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
+                processUpdate();
+            else if(!incomingMsg.isUpdateMsg())
+                receivedMsgList.add(incomingMsg);
         }
         if(receivedMsgList.size() != 0){
             byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
@@ -160,22 +170,22 @@
                 case MessageFromHead.BothMsgsFromHead:
                 case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
                     for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    getVertexValue().setState(MessageFlag.IS_FINAL);
+                        processFinalMerge(receivedMsgList.get(i)); //processMerge()
+                    getVertexValue().setState(State.IS_FINAL);
                     voteToHalt();
                     break;
                 case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
                     for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    getVertexValue().setState(MessageFlag.IS_HEAD);
+                        processFinalMerge(receivedMsgList.get(i));
+                    getVertexValue().setState(State .IS_HEAD);
                     break;
                 case MessageFromHead.BothMsgsFromNonHead:
                     for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
+                        processFinalMerge(receivedMsgList.get(i));
                     break;
                 case MessageFromHead.NO_MSG:
                     //halt
-                    deleteVertex(getVertexId());
+                    voteToHalt(); //deleteVertex(getVertexId());
                     break;
             }
         }
@@ -188,12 +198,21 @@
         else if (getSuperstep() == 2)
             initState(msgIterator);
         else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
-            sendMsgToPathVertex(msgIterator);
-            if(selfFlag != MessageFlag.IS_HEAD)
-                voteToHalt();
+            if(msgIterator.hasNext()){ //for processing final merge
+                incomingMsg = msgIterator.next();
+                if(getMsgFlag() == MessageFlag.IS_FINAL){
+                    setFinalState();
+                    processFinalMerge(incomingMsg);
+                }
+            }
+            else{
+                sendMsgToPathVertex(msgIterator);
+                if(selfFlag != State.IS_HEAD)
+                    voteToHalt();
+            }
         } else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
             responseMsgToHeadVertex(msgIterator);
-            if(selfFlag != MessageFlag.IS_HEAD)
+            if(selfFlag != State.IS_HEAD)
                 voteToHalt();
         } else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
             processMergeInHeadVertex(msgIterator);
@@ -207,9 +226,9 @@
         /**
          * BinaryInput and BinaryOutput~/
          */
-        job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
-        job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
-        job.setOutputKeyClass(PositionWritable.class);
+        job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+        job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+        job.setOutputKeyClass(KmerBytesWritable.class);
         job.setOutputValueClass(VertexValueWritable.class);
         job.setDynamicVertexValueSize(true);
         Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index ac43c62..53e46af 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -60,7 +60,7 @@
     private MessageWritable outgoingMsg = new MessageWritable();
   
     private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
-    private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+    private KmerBytesWritable lastKmer = new KmerBytesWritable();
     
     private PositionWritable destVertexId = new PositionWritable();
     private Iterator<PositionWritable> posIterator;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index e6c5f34..7a22d25 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -78,8 +78,8 @@
             incomingMsg = new MessageWritable(kmerSize);
         if(outgoingMsg == null)
             outgoingMsg = new MessageWritable(kmerSize);
-        //if (randSeed < 0)
-        //    randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+        else
+            outgoingMsg.reset(kmerSize);
         randSeed = getSuperstep();
         randGenerator = new Random(randSeed);
         if (probBeingRandomHead < 0)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index 6b9eb4e..d3180c8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -28,7 +28,8 @@
         SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
         SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, KmerBytesWritable.class,
                 NullWritable.class, CompressionType.NONE);
-        KmerBytesWritable outKey = new KmerBytesWritable(55);
+        KmerBytesWritable.setGlobalKmerLength(55);
+        KmerBytesWritable outKey = new KmerBytesWritable();
         int i = 0;
 
         for (i = 0; i < numOfLines; i++) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index bc08600..8618237 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -19,6 +19,7 @@
 public class GenerateTextFile {
 
     public static void generateFromPathmergeResult(int kmerSize, String strSrcDir, String outPutDir) throws IOException {
+        KmerBytesWritable.setGlobalKmerLength(kmerSize);
         Configuration conf = new Configuration();
         FileSystem fileSys = FileSystem.getLocal(conf);
 
@@ -44,13 +45,14 @@
     }
 
     public static void generateSpecificLengthChainFromNaivePathmergeResult(int maxLength) throws IOException {
+        KmerBytesWritable.setGlobalKmerLength(55);
         BufferedWriter bw = new BufferedWriter(new FileWriter("naive_text_" + maxLength));
         Configuration conf = new Configuration();
         FileSystem fileSys = FileSystem.get(conf);
         for (int i = 0; i < 2; i++) {
             Path path = new Path("/home/anbangx/genomix_result/final_naive/part-" + i);
             SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
-            KmerBytesWritable key = new KmerBytesWritable(55);
+            KmerBytesWritable key = new KmerBytesWritable();
             VertexValueWritable value = new VertexValueWritable();
 
             while (reader.next(key, value)) {
@@ -68,13 +70,14 @@
     }
 
     public static void generateSpecificLengthChainFromLogPathmergeResult(int maxLength) throws IOException {
+        KmerBytesWritable.setGlobalKmerLength(55);
         BufferedWriter bw = new BufferedWriter(new FileWriter("log_text_" + maxLength));
         Configuration conf = new Configuration();
         FileSystem fileSys = FileSystem.get(conf);
         for (int i = 0; i < 2; i++) {
             Path path = new Path("/home/anbangx/genomix_result/improvelog2/part-" + i);
             SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
-            KmerBytesWritable key = new KmerBytesWritable(55);
+            KmerBytesWritable key = new KmerBytesWritable();
             VertexValueWritable value = new VertexValueWritable();
 
             while (reader.next(key, value)) {
@@ -93,12 +96,13 @@
     }
 
     public static void generateFromGraphbuildResult() throws IOException {
+        KmerBytesWritable.setGlobalKmerLength(55);
         BufferedWriter bw = new BufferedWriter(new FileWriter("textfile"));
         Configuration conf = new Configuration();
         FileSystem fileSys = FileSystem.get(conf);
         Path path = new Path("data/input/part-0-out-3000000");
         SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
-        KmerBytesWritable key = new KmerBytesWritable(55);
+        KmerBytesWritable key = new KmerBytesWritable();
 
         while (reader.next(key, null)) {
             if (key == null) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 6e1bc4b..bee1dd8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -9,4 +9,5 @@
     public static final byte DIR_RF = 0b011 << 0;
     public static final byte DIR_RR = 0b100 << 0;
     public static final byte DIR_MASK = 0b111 << 0;
+    public static final byte DIR_CLEAR = 0b1111000 << 0;
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 0e798fe..e18c31b 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -7,6 +7,7 @@
 import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
 import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
 import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
 import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -42,21 +43,21 @@
 //                + "NaiveAlgorithmForMergeGraph.xml");
 //    }
 
-//    private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
-//        PregelixJob job = new PregelixJob(jobName);
-//        job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
-//        job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
-//        job.setVertexOutputFormatClass(GraphCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
-//        job.setDynamicVertexValueSize(true);
-//        job.setOutputKeyClass(PositionWritable.class);
-//        job.setOutputValueClass(VertexValueWritable.class);
-//        job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
-//        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-//    }
-//
-//    private static void genLogAlgorithmForMergeGraph() throws IOException {
-//        generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
-//    }
+    private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+        job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+        job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class); 
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(VertexValueWritable.class);
+        job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void genLogAlgorithmForMergeGraph() throws IOException {
+        generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+    }
 //    
 //    private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
 //        PregelixJob job = new PregelixJob(jobName);
@@ -77,22 +78,22 @@
 //                + "P3ForMergeGraph.xml");
 //    }
     
-    private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
-        PregelixJob job = new PregelixJob(jobName);
-        job.setVertexClass(P4ForPathMergeVertex.class);
-        job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
-        job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-        job.setDynamicVertexValueSize(true);
-        job.setOutputKeyClass(KmerBytesWritable.class);
-        job.setOutputValueClass(VertexValueWritable.class);
-        job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
-        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-    }
-
-    private static void genP4ForMergeGraph() throws IOException {
-        generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
-                + "P4ForMergeGraph.xml");
-    }
+//    private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+//        PregelixJob job = new PregelixJob(jobName);
+//        job.setVertexClass(P4ForPathMergeVertex.class);
+//        job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+//        job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+//        job.setDynamicVertexValueSize(true);
+//        job.setOutputKeyClass(KmerBytesWritable.class);
+//        job.setOutputValueClass(VertexValueWritable.class);
+//        job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+//        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+//    }
+//
+//    private static void genP4ForMergeGraph() throws IOException {
+//        generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+//                + "P4ForMergeGraph.xml");
+//    }
     
 //    private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
 //        PregelixJob job = new PregelixJob(jobName);
@@ -198,7 +199,7 @@
     
     public static void main(String[] args) throws IOException {
         //genNaiveAlgorithmForMergeGraph();
-        //genLogAlgorithmForMergeGraph();
+        genLogAlgorithmForMergeGraph();
         //genP3ForMergeGraph();
         //genTipAddGraph();
 //        genTipRemoveGraph();
@@ -206,7 +207,7 @@
 //        genBridgeRemoveGraph();
 //        genBubbleAddGraph();
 //        genBubbleMergeGraph();
-        genP4ForMergeGraph();
+//        genP4ForMergeGraph();
     }
 
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 5aedeb7..1578dfc 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -52,7 +52,7 @@
 //    + "6", PreFix + File.separator
 //    + "7", PreFix + File.separator
 //    + "8", PreFix + File.separator
-    + "4"};
+    + "9"};
     private static final String ACTUAL_RESULT_DIR = "data/actual/pathmerge";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
     private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
deleted file mode 100644
index 597e5c3..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>P4ForMergeGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.KmerBytesWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>BasicPathMergeVertex.kmerSize</name><value>3</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
index 3d007d2..5a15ca0 100644
--- a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
+++ b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
@@ -1 +1 @@
-P4ForMergeGraph.xml
+LogAlgorithmForMergeGraph.xml