Merge branch 'genomix/fullstack_genomix' into nanzhang/hyracks_genomix

Conflicts:
	genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
similarity index 97%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
index 08cfa9d..220f45c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.type;
+package edu.uci.ics.genomix.oldtype;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -7,6 +7,9 @@
 
 import org.apache.hadoop.io.WritableComparable;
 
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
 public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
 
     private static final long serialVersionUID = 1L;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
index 3931e7f..9fc1829 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
@@ -211,13 +211,13 @@
     @Override
     public String toString() {
         StringBuilder sbuilder = new StringBuilder();
-        sbuilder.append('(');
+        sbuilder.append('{');
         sbuilder.append(nodeID.toString()).append('\t');
         sbuilder.append(forwardForwardList.toString()).append('\t');
         sbuilder.append(forwardReverseList.toString()).append('\t');
         sbuilder.append(reverseForwardList.toString()).append('\t');
         sbuilder.append(reverseReverseList.toString()).append('\t');
-        sbuilder.append(kmer.toString()).append(')');
+        sbuilder.append(kmer.toString()).append('}');
         return sbuilder.toString();
     }
 
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
index 0f91a7c..eb0bd59 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -17,7 +17,7 @@
     protected byte[] storage;
     protected int offset;
     protected int valueCount;
-    public int kmerByteSize = 0; //default kmerSize = 5, kmerByteSize = 2, fix length once setting
+    public int kmerByteSize = 0; 
     public int kmerlength = 0;
     protected static final byte[] EMPTY = {};
     
@@ -29,13 +29,10 @@
         this.offset = 0;
     }
     
-    public KmerListWritable(int kmerSize) {
+    public KmerListWritable(int kmerlength) {
         this();
-        this.kmerByteSize = KmerUtil.getByteNumFromK(kmerSize);;
-    }
-    
-    public KmerListWritable(int count, byte[] data, int offset) {
-        setNewReference(count, data, offset);
+        this.kmerlength = kmerlength;
+        this.kmerByteSize = KmerUtil.getByteNumFromK(kmerlength);
     }
     
     public KmerListWritable(int kmerlength, int count, byte[] data, int offset) {
@@ -66,6 +63,20 @@
         valueCount += 1;
     }
     
+    /*
+     * Append the otherList to the end of myList
+     */
+    public void appendList(KmerListWritable otherList) {
+        if (otherList.valueCount > 0) {
+            setSize((valueCount + otherList.valueCount) * kmerByteSize);
+            // copy contents of otherList into the end of my storage
+            System.arraycopy(otherList.storage, otherList.offset,
+                    storage, offset + valueCount * kmerByteSize, 
+                    otherList.valueCount * kmerByteSize);
+            valueCount += otherList.valueCount;
+        }
+    }
+    
     protected void setSize(int size) {
         if (size > getCapacity()) {
             setCapacity((size * 3 / 2));
@@ -87,7 +98,9 @@
         }
     }
     
-    public void reset() {
+    public void reset(int kmerSize) {
+        kmerlength = kmerSize;
+        kmerByteSize = KmerUtil.getByteNumFromK(kmerlength);
         storage = EMPTY;
         valueCount = 0;
         offset = 0;
@@ -177,10 +190,6 @@
     public String toString() {
         StringBuilder sbuilder = new StringBuilder();
         sbuilder.append('[');
-//        for (KmerBytesWritable kmer : this) {
-//            sbuilder.append(kmer.toString());
-//            sbuilder.append(',');
-//        }
         for(int i = 0; i < valueCount; i++){
             sbuilder.append(getPosition(i).toString());
             sbuilder.append(',');
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
new file mode 100644
index 0000000..4725e30
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -0,0 +1,242 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+
+
+public class NodeWritable implements WritableComparable<NodeWritable>, Serializable{
+
+//    public static class KMER{
+//        public static final byte EXIST = 0;
+//        public static final byte NON_EXIST = 1;
+//    }
+    
+    private static final long serialVersionUID = 1L;
+    public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
+    
+    private PositionListWritable nodeIdList;
+    private KmerListWritable forwardForwardList;
+    private KmerListWritable forwardReverseList;
+    private KmerListWritable reverseForwardList;
+    private KmerListWritable reverseReverseList;
+    private KmerBytesWritable kmer;
+//    private byte kmerMark;
+    
+    // merge/update directions
+    public static class DirectionFlag {
+        public static final byte DIR_FF = 0b00 << 0;
+        public static final byte DIR_FR = 0b01 << 0;
+        public static final byte DIR_RF = 0b10 << 0;
+        public static final byte DIR_RR = 0b11 << 0;
+        public static final byte DIR_MASK = 0b11 << 0;
+    }
+    
+    public NodeWritable() {
+        this(0);
+    }
+    
+    public NodeWritable(int kmerSize) {
+        nodeIdList = new PositionListWritable();
+        forwardForwardList = new KmerListWritable(kmerSize);
+        forwardReverseList = new KmerListWritable(kmerSize);
+        reverseForwardList = new KmerListWritable(kmerSize);
+        reverseReverseList = new KmerListWritable(kmerSize);
+        kmer = new KmerBytesWritable(kmerSize);
+//        kmerMark = KMER.NON_EXIST;
+    }
+    
+    public NodeWritable(PositionListWritable nodeIdList, KmerListWritable FFList, KmerListWritable FRList,
+            KmerListWritable RFList, KmerListWritable RRList, KmerBytesWritable kmer) {
+        this(kmer.getKmerLength());
+        set(nodeIdList, FFList, FRList, RFList, RRList, kmer);
+    }
+    
+    public void set(NodeWritable node){
+        set(node.nodeIdList, node.forwardForwardList, node.forwardReverseList, node.reverseForwardList, 
+                node.reverseReverseList, node.kmer);
+    }
+    
+    public void set(PositionListWritable nodeIdList, KmerListWritable FFList, KmerListWritable FRList,
+            KmerListWritable RFList, KmerListWritable RRList, KmerBytesWritable kmer) {
+        this.nodeIdList.set(nodeIdList);
+        this.forwardForwardList.set(FFList);
+        this.forwardReverseList.set(FRList);
+        this.reverseForwardList.set(RFList);
+        this.reverseReverseList.set(RRList);
+        this.kmer.set(kmer);
+//        kmerMark = KMER.EXIST;
+    }
+
+    public void reset(int kmerSize) {
+        nodeIdList.reset();
+        forwardForwardList.reset(kmerSize);
+        forwardReverseList.reset(kmerSize);
+        reverseForwardList.reset(kmerSize);
+        reverseReverseList.reset(kmerSize);
+        kmer.reset(kmerSize);
+//        kmerMark = KMER.NON_EXIST;
+    }
+    
+    
+    public PositionListWritable getNodeIdList() {
+        return nodeIdList;
+    }
+
+    public void setNodeIdList(PositionListWritable nodeIdList) {
+        this.nodeIdList.set(nodeIdList);
+    }
+
+    public KmerBytesWritable getKmer() {
+        return kmer;
+    }
+
+    public void setKmer(KmerBytesWritable kmer) {
+//        kmerMark = KMER.EXIST;
+        this.kmer.set(kmer);
+    }
+    
+    public int getCount() {
+        return kmer.getKmerLength();
+    }
+    
+    public KmerListWritable getFFList() {
+        return forwardForwardList;
+    }
+
+    public KmerListWritable getFRList() {
+        return forwardReverseList;
+    }
+
+    public KmerListWritable getRFList() {
+        return reverseForwardList;
+    }
+
+    public KmerListWritable getRRList() {
+        return reverseReverseList;
+    }
+    
+	public void setFFList(KmerListWritable forwardForwardList) {
+		this.forwardForwardList.set(forwardForwardList);
+	}
+
+	public void setFRList(KmerListWritable forwardReverseList) {
+		this.forwardReverseList.set(forwardReverseList);
+	}
+
+	public void setRFList(KmerListWritable reverseForwardList) {
+		this.reverseForwardList.set(reverseForwardList);
+	}
+
+	public void setRRList(KmerListWritable reverseReverseList) {
+		this.reverseReverseList.set(reverseReverseList);
+	}
+
+	public KmerListWritable getListFromDir(byte dir) {
+        switch (dir & DirectionFlag.DIR_MASK) {
+            case DirectionFlag.DIR_FF:
+                return getFFList();
+            case DirectionFlag.DIR_FR:
+                return getFRList();
+            case DirectionFlag.DIR_RF:
+                return getRFList();
+            case DirectionFlag.DIR_RR:
+                return getRRList();
+            default:
+                throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
+        }
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+//        out.writeByte(kmerMark);
+//        this.nodeIdList.write(out);
+        this.forwardForwardList.write(out);
+        this.forwardReverseList.write(out);
+//        this.reverseForwardList.write(out);
+//        this.reverseReverseList.write(out);
+//        if(kmerMark == KMER.EXIST) 
+//        this.kmer.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+//        kmerMark = in.readByte();
+//        this.nodeIdList.readFields(in);
+        this.forwardForwardList.readFields(in);
+        this.forwardReverseList.readFields(in);
+//        this.reverseForwardList.readFields(in);
+//        this.reverseReverseList.readFields(in);
+//        if(kmerMark == KMER.EXIST)
+//        this.kmer.readFields(in);
+    }
+
+    @Override
+    public int compareTo(NodeWritable other) {
+        return this.kmer.compareTo(other.kmer);
+    }
+    
+    @Override
+    public int hashCode() {
+        return this.kmer.hashCode();
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof NodeWritable) {
+            NodeWritable nw = (NodeWritable) o;
+            return (this.nodeIdList.equals(nw.nodeIdList)
+                    && this.forwardForwardList.equals(nw.forwardForwardList)
+                    && this.forwardReverseList.equals(nw.forwardReverseList)
+                    && this.reverseForwardList.equals(nw.reverseForwardList)
+                    && this.reverseReverseList.equals(nw.reverseReverseList) && this.kmer.equals(nw.kmer));
+        }
+        return false;
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('(');
+        sbuilder.append(nodeIdList.toString()).append('\t');
+        sbuilder.append(forwardForwardList.toString()).append('\t');
+        sbuilder.append(forwardReverseList.toString()).append('\t');
+        sbuilder.append(reverseForwardList.toString()).append('\t');
+        sbuilder.append(reverseReverseList.toString()).append('\t');
+        sbuilder.append(kmer.toString()).append(')');
+        return sbuilder.toString();
+    }
+
+    public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
+        this.forwardForwardList.set(nextNode.forwardForwardList);
+        this.forwardReverseList.set(nextNode.forwardReverseList);
+        kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
+    }
+
+    public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
+        this.reverseForwardList.set(preNode.reverseForwardList);
+        this.reverseReverseList.set(preNode.reverseReverseList);
+        kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
+    }
+    
+    public int inDegree() {
+        return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+    }
+
+    public int outDegree() {
+        return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+    }
+
+    /*
+     * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1 
+     */
+    public boolean isPathNode() {
+        return inDegree() == 1 && outDegree() == 1;
+    }
+
+    public boolean isSimpleOrTerminalPath() {
+        return isPathNode() || (inDegree() == 0 && outDegree() == 1) || (inDegree() == 1 && outDegree() == 0);
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java
deleted file mode 100644
index 7afc1ef..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.hadoop.io.Writable;
-import edu.uci.ics.genomix.data.Marshal;
-public class ReadIDListWritable implements Writable, Iterable<ReadIDWritable>, Serializable{
-    private static final long serialVersionUID = 1L;
-    protected byte[] storage;
-    protected int offset;
-    protected int valueCount;
-    protected static final byte[] EMPTY = {};
-    
-    protected ReadIDWritable posIter = new ReadIDWritable();
-    
-    public ReadIDListWritable() {
-        this.storage = EMPTY;
-        this.valueCount = 0;
-        this.offset = 0;
-    }
-    
-    public ReadIDListWritable(int count, byte[] data, int offset) {
-        setNewReference(count, data, offset);
-    }
-    
-    public ReadIDListWritable(List<ReadIDWritable> posns) {
-        this();
-        setSize(posns.size());  // reserve space for all elements
-        for (ReadIDWritable p : posns) {
-            append(p);
-        }
-    }
-    
-    public void setNewReference(int count, byte[] data, int offset) {
-        this.valueCount = count;
-        this.storage = data;
-        this.offset = offset;
-    }
-    
-    public void append(long readIDByte) {
-        setSize((1 + valueCount) * ReadIDWritable.LENGTH);
-        Marshal.putLong(readIDByte, storage, offset + valueCount * ReadIDWritable.LENGTH);
-        valueCount += 1;
-    }
-    
-    public void append(byte mateId, long readId){
-        append(ReadIDWritable.serialize(mateId, readId));
-    }
-    
-    public void append(ReadIDWritable pos) {
-        if(pos != null)
-            append(pos.deserialize());
-        else
-            throw new RuntimeException("This position is null pointer!");
-    }
-    
-    /*
-     * Append the otherList to the end of myList
-     */
-    public void appendList(ReadIDListWritable otherList) {
-        if (otherList.valueCount > 0) {
-            setSize((valueCount + otherList.valueCount) * ReadIDWritable.LENGTH);
-            // copy contents of otherList into the end of my storage
-            System.arraycopy(otherList.storage, otherList.offset,
-                    storage, offset + valueCount * ReadIDWritable.LENGTH, 
-                    otherList.valueCount * ReadIDWritable.LENGTH);
-            valueCount += otherList.valueCount;
-        }
-    }
-    
-    public static int getCountByDataLength(int length) {
-        if (length % ReadIDWritable.LENGTH != 0) {
-            throw new IllegalArgumentException("Length of positionlist is invalid");
-        }
-        return length / ReadIDWritable.LENGTH;
-    }
-    
-    public void set(ReadIDListWritable otherList) {
-        set(otherList.valueCount, otherList.storage, otherList.offset);
-    }
-
-    public void set(int valueCount, byte[] newData, int offset) {
-        this.valueCount = valueCount;
-        setSize(valueCount * ReadIDWritable.LENGTH);
-        if (valueCount > 0) {
-            System.arraycopy(newData, offset, storage, this.offset, valueCount * ReadIDWritable.LENGTH);
-        }
-    }
-
-    public void reset() {
-        valueCount = 0;
-    }
-    
-    protected void setSize(int size) {
-        if (size > getCapacity()) {
-            setCapacity((size * 3 / 2));
-        }
-    }
-    
-    protected int getCapacity() {
-        return storage.length - offset;
-    }
-
-    protected void setCapacity(int new_cap) {
-        if (new_cap > getCapacity()) {
-            byte[] new_data = new byte[new_cap];
-            if (storage.length - offset > 0) {
-                System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
-            }
-            storage = new_data;
-            offset = 0;
-        }
-    }
-    
-    public ReadIDWritable getPosition(int i) {
-        if (i >= valueCount) {
-            throw new ArrayIndexOutOfBoundsException("No such positions");
-        }
-        posIter.setNewReference(storage, offset + i * ReadIDWritable.LENGTH);
-        return posIter;
-    }
-    
-    public void resetPosition(int i, long readIDByte) {
-        if (i >= valueCount) {
-            throw new ArrayIndexOutOfBoundsException("No such positions");
-        }
-        Marshal.putLong(readIDByte, storage, offset + i * ReadIDWritable.LENGTH);
-    }
-    
-    public int getCountOfPosition() {
-        return valueCount;
-    }
-
-    public byte[] getByteArray() {
-        return storage;
-    }
-
-    public int getStartOffset() {
-        return offset;
-    }
-
-    public int getLength() {
-        return valueCount * ReadIDWritable.LENGTH;
-    }
-    
-    @Override
-    public Iterator<ReadIDWritable> iterator() {
-        Iterator<ReadIDWritable> it = new Iterator<ReadIDWritable>() {
-
-            private int currentIndex = 0;
-
-            @Override
-            public boolean hasNext() {
-                return currentIndex < valueCount;
-            }
-
-            @Override
-            public ReadIDWritable next() {
-                return getPosition(currentIndex++);
-            }
-
-            @Override
-            public void remove() {
-                if(currentIndex < valueCount)
-                    System.arraycopy(storage, offset + currentIndex * ReadIDWritable.LENGTH, 
-                          storage, offset + (currentIndex - 1) * ReadIDWritable.LENGTH, 
-                          (valueCount - currentIndex) * ReadIDWritable.LENGTH);
-                valueCount--;
-                currentIndex--;
-            }
-        };
-        return it;
-    }
-    
-    /*
-     * remove the first instance of @toRemove. Uses a linear scan.  Throws an exception if not in this list.
-     */
-    public void remove(ReadIDWritable toRemove, boolean ignoreMissing) {
-        Iterator<ReadIDWritable> posIterator = this.iterator();
-        while (posIterator.hasNext()) {
-            if(toRemove.equals(posIterator.next())) {
-                posIterator.remove();
-                return;
-            }
-        }
-        if (!ignoreMissing) {
-            throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
-        }
-    }
-    
-    public void remove(ReadIDWritable toRemove) {
-        remove(toRemove, false);
-    }
-    
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(valueCount);
-        out.write(storage, offset, valueCount * ReadIDWritable.LENGTH);
-    }
-    
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        this.valueCount = in.readInt();
-        setSize(valueCount * ReadIDWritable.LENGTH);
-        in.readFully(storage, offset, valueCount * ReadIDWritable.LENGTH);
-    }
-    
-    @Override
-    public String toString() {
-        StringBuilder sbuilder = new StringBuilder();
-        sbuilder.append('[');
-        for (ReadIDWritable pos : this) {
-            sbuilder.append(pos.toString());
-            sbuilder.append(',');
-        }
-        if (valueCount > 0) {
-            sbuilder.setCharAt(sbuilder.length() - 1, ']');
-        } else {
-            sbuilder.append(']');
-        }
-        return sbuilder.toString();
-    }
-    
-    @Override
-    public int hashCode() {
-        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof PositionListWritable))
-            return false;
-        PositionListWritable other = (PositionListWritable) o;
-        if (this.valueCount != other.valueCount)
-            return false;
-        for (int i=0; i < this.valueCount; i++) {
-                if (!this.getPosition(i).equals(other.getPosition(i)))
-                    return false;
-        }
-        return true;
-    }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java
deleted file mode 100644
index b6c829a..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.data.Marshal;
-
-public class ReadIDWritable implements WritableComparable<ReadIDWritable>, Serializable{
-    private static final long serialVersionUID = 1L;
-    protected byte[] storage;
-    protected int offset;
-    public static final int LENGTH = 6;
-    
-    public static final int totalBits = 48;
-    private static final int bitsForMate = 1;
-    private static final int readIdShift = bitsForMate;
-    
-    public ReadIDWritable() {
-        storage = new byte[LENGTH];
-        offset = 0;
-    }
-    
-    public ReadIDWritable(byte mateId, long readId){
-        this();
-        set(mateId, readId);
-    }
-    
-    public ReadIDWritable(byte[] storage, int offset) {
-        setNewReference(storage, offset);
-    }
-    
-    public void set(long readIDByte){
-        Marshal.putLong(readIDByte, storage, offset);
-    }
-    
-    public static long serialize(byte mateId, long readId) {
-        return (readId << 1) + (mateId & 0b1);
-    }
-    
-    public void set(byte mateId, long readId){
-        Marshal.putLong(serialize(mateId, readId), storage, offset);
-    }
-    
-    public void set(ReadIDWritable pos) {
-        set(pos.getMateId(),pos.getReadId());
-    }
-    
-    public void setNewReference(byte[] storage, int offset) {
-        this.storage = storage;
-        this.offset = offset;
-    }
-    
-    public void reset(){
-        storage = new byte[LENGTH];
-        offset = 0;
-    }
-    
-    public long deserialize(){
-        return Marshal.getLong(storage, offset);
-    }
-    
-    public byte getMateId(){
-        return (byte) (Marshal.getLong(storage, offset) & 0b1);
-    }
-    
-    public long getReadId(){
-        return Marshal.getLong(storage, offset) >>> readIdShift;
-    }
-    
-    public byte[] getByteArray() {
-        return storage;
-    }
-
-    public int getStartOffset() {
-        return offset;
-    }
-
-    public int getLength() {
-        return LENGTH;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        in.readFully(storage, offset, LENGTH);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.write(storage, offset, LENGTH);
-    }
-    
-    @Override
-    public int hashCode() {
-        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof ReadIDWritable))
-            return false;
-        ReadIDWritable other = (ReadIDWritable) o;
-        return this.deserialize() == other.deserialize();
-    }
-    
-    @Override
-    public int compareTo(ReadIDWritable other) {
-        return (this.deserialize() < other.deserialize()) ? -1 : ((this.deserialize() == other.deserialize()) ? 0 : 1);
-    }
-    
-    /*
-     * String of form "(readId-posID_mate)" where mate is _1 or _2
-     */
-    @Override
-    public String toString() {
-        return "(" + this.getReadId() + "_" + (this.getMateId() + 1) + ")";
-    }
-}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
index c31ca6d..71246a1 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
@@ -24,13 +24,13 @@
             String randomString = generateString(i);
             byte[] array = randomString.getBytes();
             kmer.setByRead(array, 0);
-            kmerList.reset();
+            kmerList.reset(kmer.getKmerLength());
             kmerList.append(kmer);
             Assert.assertEquals(kmerList.getPosition(0).toString(), randomString);
             Assert.assertEquals(1, kmerList.getCountOfPosition());
         }
         
-        kmerList.reset();
+        kmerList.reset(kmer.getKmerLength());
         //add one more kmer each time and fix kmerSize
         for (int i = 0; i < 200; i++) {
             kmer = new KmerBytesWritable(5);
diff --git a/genomix/genomix-hadoop/data/webmap/test.txt b/genomix/genomix-hadoop/data/webmap/test.txt
new file mode 100644
index 0000000..17770fa
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/test.txt
@@ -0,0 +1 @@
+1	AATAGAAG
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
new file mode 100644
index 0000000..3723ed9
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
@@ -0,0 +1,85 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+
+@SuppressWarnings("deprecation")
+public class GenomixDriver {
+    
+    private static class Options {
+        @Option(name = "-inputpath", usage = "the input path", required = true)
+        public String inputPath;
+
+        @Option(name = "-outputpath", usage = "the output path", required = true)
+        public String outputPath;
+
+        @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+        public int numReducers;
+
+        @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+        public int sizeKmer;
+        
+        @Option(name = "-read-length", usage = "the length of read", required = true)
+        public int readLength;
+    }
+    
+    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength,
+            boolean seqOutput, String defaultConfPath) throws IOException{
+        JobConf conf = new JobConf(GenomixDriver.class);
+        conf.setInt("sizeKmer", sizeKmer);
+        conf.setInt("readLength", readLength);
+        if (defaultConfPath != null) {
+            conf.addResource(new Path(defaultConfPath));
+        }
+
+        conf.setJobName("Genomix Graph Building");
+        conf.setMapperClass(GenomixMapper.class);
+        conf.setReducerClass(GenomixReducer.class);
+
+        conf.setMapOutputKeyClass(KmerBytesWritable.class);
+        conf.setMapOutputValueClass(NodeWritable.class);
+        
+        //InputFormat and OutputFormat for Reducer
+        conf.setInputFormat(TextInputFormat.class);
+        if (seqOutput == true)
+            conf.setOutputFormat(SequenceFileOutputFormat.class);
+        else
+            conf.setOutputFormat(TextOutputFormat.class);
+        
+        //Output Key/Value Class
+        conf.setOutputKeyClass(KmerBytesWritable.class);
+        conf.setOutputValueClass(NodeWritable.class);
+        
+        FileInputFormat.setInputPaths(conf, new Path(inputPath));
+        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+        conf.setNumReduceTasks(numReducers);
+
+        FileSystem dfs = FileSystem.get(conf);
+        dfs.delete(new Path(outputPath), true);
+        JobClient.runJob(conf);
+    }
+    
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+        GenomixDriver driver = new GenomixDriver();
+        driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, 
+                options.readLength, true, null);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
new file mode 100644
index 0000000..3b615cb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
@@ -0,0 +1,242 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class GenomixMapper extends MapReduceBase implements
+    Mapper<LongWritable, Text, KmerBytesWritable, NodeWritable>{
+    
+    public static enum KmerDir{
+        FORWARD,
+        REVERSE,
+    }
+    
+    public static int KMER_SIZE;
+    private KmerBytesWritable preForwardKmer;
+    private KmerBytesWritable preReverseKmer;
+    private KmerBytesWritable curForwardKmer;
+    private KmerBytesWritable curReverseKmer;
+    private KmerBytesWritable nextForwardKmer;
+    private KmerBytesWritable nextReverseKmer;
+    private PositionWritable nodeId;
+    private PositionListWritable nodeIdList;
+    private KmerListWritable edgeListForPreKmer;
+    private KmerListWritable edgeListForNextKmer;
+    private NodeWritable outputNode;
+    
+    private KmerDir preKmerDir;
+    private KmerDir curKmerDir;
+    private KmerDir nextKmerDir;
+    
+    byte mateId = (byte)0;
+    
+    @Override
+    public void configure(JobConf job) {
+        KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+        preForwardKmer = new KmerBytesWritable(KMER_SIZE);
+        preReverseKmer = new KmerBytesWritable(KMER_SIZE);
+        curForwardKmer = new KmerBytesWritable(KMER_SIZE);
+        curReverseKmer = new KmerBytesWritable(KMER_SIZE);
+        nextForwardKmer = new KmerBytesWritable(KMER_SIZE);
+        nextReverseKmer = new KmerBytesWritable(KMER_SIZE);
+        nodeId = new PositionWritable();
+        nodeIdList = new PositionListWritable();
+        edgeListForPreKmer = new KmerListWritable(KMER_SIZE);
+        edgeListForNextKmer = new KmerListWritable(KMER_SIZE);
+        outputNode = new NodeWritable(KMER_SIZE);
+        preKmerDir = KmerDir.FORWARD;
+        curKmerDir = KmerDir.FORWARD;
+        nextKmerDir = KmerDir.FORWARD;
+    }
+    
+    @Override
+    public void map(LongWritable key, Text value, OutputCollector<KmerBytesWritable, NodeWritable> output,
+            Reporter reporter) throws IOException {
+        /** first kmer */
+        String[] rawLine = value.toString().split("\\t"); // Read the Real Gene Line
+        if (rawLine.length != 2) {
+            throw new IOException("invalid data");
+        }
+        int readID = 0;
+        readID = Integer.parseInt(rawLine[0]);
+        String geneLine = rawLine[1];
+        Pattern genePattern = Pattern.compile("[AGCT]+");
+        Matcher geneMatcher = genePattern.matcher(geneLine);
+        boolean isValid = geneMatcher.matches();
+        if (isValid == true) {
+            byte[] array = geneLine.getBytes();
+            if (KMER_SIZE >= array.length) {
+                throw new IOException("short read");
+            }
+            
+            /** first kmer **/
+            outputNode.reset(KMER_SIZE);
+            curForwardKmer.setByRead(array, 0);
+            curReverseKmer.setByReadReverse(array, 0);
+            curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+            setNextKmer(array[KMER_SIZE]);
+            //set value.nodeId
+            setNodeId(mateId, readID, 1);
+            //set value.edgeList
+            setEdgeListForNextKmer();
+            //output mapper result
+            setMapperOutput(output);
+            
+            /** middle kmer **/
+            for (int i = KMER_SIZE + 1; i < array.length; i++) {
+                outputNode.reset(KMER_SIZE);
+            	setPreKmerByOldCurKmer();
+            	setCurKmerByOldNextKmer();
+            	setNextKmer(array[i]);
+        	    //set value.nodeId
+            	setNodeId(mateId, readID, i - KMER_SIZE + 1);
+                //set value.edgeList
+                setEdgeListForPreKmer();
+                setEdgeListForNextKmer();
+                //output mapper result
+                setMapperOutput(output);
+            }
+            
+            /** last kmer **/
+            outputNode.reset(KMER_SIZE);
+        	setPreKmerByOldCurKmer();
+        	setCurKmerByOldNextKmer();
+        	//set value.nodeId
+        	setNodeId(mateId, readID, array.length - KMER_SIZE + 1);
+            //set value.edgeList
+            setEdgeListForPreKmer();
+            //output mapper result
+            setMapperOutput(output);
+        }
+    }
+    
+    public void setNodeId(byte mateId, long readID, int posId){
+        nodeId.set(mateId, readID, posId);
+        nodeIdList.reset();
+        nodeIdList.append(nodeId);
+        outputNode.setNodeIdList(nodeIdList);
+    }
+    
+    public void setEdgeListForPreKmer(){
+    	switch(curKmerDir){
+    		case FORWARD:
+    			switch(preKmerDir){
+    				case FORWARD:
+    				    edgeListForPreKmer.reset(KMER_SIZE);
+    				    edgeListForPreKmer.append(preForwardKmer);
+    					outputNode.setRRList(edgeListForPreKmer);
+    					break;
+    				case REVERSE:
+    				    edgeListForPreKmer.reset(KMER_SIZE);
+    				    edgeListForPreKmer.append(preReverseKmer);
+    					outputNode.setRFList(edgeListForPreKmer);
+    					break;
+    			}
+    			break;
+    		case REVERSE:
+    			switch(preKmerDir){
+    				case FORWARD:
+    				    edgeListForPreKmer.reset(KMER_SIZE);
+    				    edgeListForPreKmer.append(preForwardKmer);
+    					outputNode.setFRList(edgeListForPreKmer);
+    					break;
+    				case REVERSE:
+    				    edgeListForPreKmer.reset(KMER_SIZE);
+    				    edgeListForPreKmer.append(preReverseKmer);
+    					outputNode.setFFList(edgeListForPreKmer);
+    					break;
+    			}
+    			break;
+    	}
+    }
+    
+    public void setEdgeListForNextKmer(){
+    	switch(curKmerDir){
+    		case FORWARD:
+    			switch(nextKmerDir){
+    				case FORWARD:
+    					edgeListForNextKmer.reset(KMER_SIZE);
+    					edgeListForNextKmer.append(nextForwardKmer);
+    					outputNode.setFFList(edgeListForNextKmer);
+    					break;
+    				case REVERSE:
+    					edgeListForNextKmer.reset(KMER_SIZE);
+    					edgeListForNextKmer.append(nextReverseKmer);
+    					outputNode.setFRList(edgeListForNextKmer);
+    					break;
+    			}
+    			break;
+    		case REVERSE:
+    			switch(nextKmerDir){
+    				case FORWARD:
+    					edgeListForNextKmer.reset(KMER_SIZE);
+    					edgeListForNextKmer.append(nextForwardKmer);
+    					outputNode.setRFList(edgeListForNextKmer);
+    					break;
+    				case REVERSE:
+    					edgeListForNextKmer.reset(KMER_SIZE);
+    					edgeListForNextKmer.append(nextReverseKmer);
+    					outputNode.setRRList(edgeListForNextKmer);
+    					break;
+    			}
+    			break;
+    	}
+    }
+    
+    //set preKmer by shifting curKmer with preChar
+    public void setPreKmer(byte preChar){
+        preForwardKmer.set(curForwardKmer);
+        preForwardKmer.shiftKmerWithPreChar(preChar);
+        preReverseKmer.setByReadReverse(preForwardKmer.toString().getBytes(), preForwardKmer.getOffset());
+        preKmerDir = preForwardKmer.compareTo(preReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+    }
+    
+    //set nextKmer by shifting curKmer with nextChar
+    public void setNextKmer(byte nextChar){
+        nextForwardKmer.set(curForwardKmer);
+        nextForwardKmer.shiftKmerWithNextChar(nextChar);
+        nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
+        nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+    }
+    
+    //old curKmer becomes current preKmer
+    public void setPreKmerByOldCurKmer(){
+    	preKmerDir = curKmerDir;
+    	preForwardKmer.set(curForwardKmer);
+    	preReverseKmer.set(curReverseKmer);
+    }
+    
+    //old nextKmer becomes current curKmer
+    public void setCurKmerByOldNextKmer(){
+    	curKmerDir = nextKmerDir;
+    	curForwardKmer.set(nextForwardKmer);
+    	preReverseKmer.set(nextReverseKmer);
+    }
+    
+    public void setMapperOutput(OutputCollector<KmerBytesWritable, NodeWritable> output) throws IOException{
+    	switch(curKmerDir){
+    	case FORWARD:
+    		output.collect(curForwardKmer, outputNode);
+    		break;
+    	case REVERSE:
+    		output.collect(curReverseKmer, outputNode);
+    		break;
+    }
+   }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
new file mode 100644
index 0000000..6472f05
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+@SuppressWarnings("deprecation")
+public class GenomixReducer extends MapReduceBase implements
+	Reducer<KmerBytesWritable, NodeWritable, KmerBytesWritable, NodeWritable>{
+    
+    private NodeWritable outputNode = new NodeWritable();
+    private NodeWritable tmpNode = new NodeWritable();
+	@Override
+	public void reduce(KmerBytesWritable key, Iterator<NodeWritable> values,
+			OutputCollector<KmerBytesWritable, NodeWritable> output,
+			Reporter reporter) throws IOException {
+		outputNode.reset(0);
+		
+//		//copy first item to outputNode
+//		if(values.hasNext()){
+//		    NodeWritable tmpNode = values.next();
+//		    outputNode.set(tmpNode);
+//		}
+		while (values.hasNext()) {
+		    tmpNode.set(values.next());
+		    outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
+		    outputNode.getFFList().appendList(tmpNode.getFFList());
+		    outputNode.getFRList().appendList(tmpNode.getFRList());
+		    outputNode.getRFList().appendList(tmpNode.getRFList());
+		    outputNode.getRRList().appendList(tmpNode.getRRList());
+		}
+		output.collect(key,outputNode);
+	}
+
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
new file mode 100644
index 0000000..4716072
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -0,0 +1,82 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
+
+@SuppressWarnings("deprecation")
+public class GraphBuildingTest {
+
+    private JobConf conf = new JobConf();
+    private static final String ACTUAL_RESULT_DIR = "actual";
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+    private static final String DATA_PATH = "data/webmap/test.txt";
+    private static final String HDFS_PATH = "/webmap";
+    private static final String RESULT_PATH = "/result";
+    
+//    private static final int COUNT_REDUCER = 2;
+    private static final int SIZE_KMER = 5;
+    private static final int READ_LENGTH = 8;
+    
+    private MiniDFSCluster dfsCluster;
+    private MiniMRCluster mrCluster;
+    private FileSystem dfs;
+    
+    @Test
+    public void test() throws Exception {
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        startHadoop();
+        TestMapKmerToNode();
+        cleanupHadoop();
+    }
+    
+    public void TestMapKmerToNode() throws Exception {
+        GenomixDriver driver = new GenomixDriver();
+        driver.run(HDFS_PATH, RESULT_PATH, 0, SIZE_KMER, READ_LENGTH, true, HADOOP_CONF_PATH);
+        dumpResult();
+    }
+    
+    private void startHadoop() throws IOException {
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, 1, true, null);
+        dfs = dfsCluster.getFileSystem();
+        mrCluster = new MiniMRCluster(1, dfs.getUri().toString(), 1);
+
+        Path src = new Path(DATA_PATH);
+        Path dest = new Path(HDFS_PATH + "/");
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(src, dest);
+
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+        conf.writeXml(confOutput);
+        confOutput.flush();
+        confOutput.close();
+    }
+
+    private void cleanupHadoop() throws IOException {
+        mrCluster.shutdown();
+        dfsCluster.shutdown();
+    }
+    
+    private void dumpResult() throws IOException {
+        Path src = new Path(RESULT_PATH);
+        Path dest = new Path(ACTUAL_RESULT_DIR);
+        dfs.copyToLocalFile(src, dest);
+        HadoopMiniClusterTest.copyResultsToLocal(RESULT_PATH, "test.txt", false, conf, true, dfs);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
index d4ae5dd..0f2d714 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -69,12 +69,19 @@
             Configuration conf) throws IOException {
     	copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText, conf, true);
     }
+    
+    public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf, boolean ignoreZeroOutputs) throws IOException {
+        copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText,
+                conf, ignoreZeroOutputs, dfs);
+    }
+    
     /*
      * Merge and copy a DFS directory to a local destination, converting to text if necessary. 
      * Also locally store the binary-formatted result if available.
      */
-    protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
-            Configuration conf, boolean ignoreZeroOutputs) throws IOException {
+    public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf, boolean ignoreZeroOutputs, FileSystem dfs) throws IOException {
         if (resultsAreText) {
             // for text files, just concatenate them together
             FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
index cec18bd..d9efd75 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
@@ -18,14 +18,20 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
 import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -36,13 +42,19 @@
      * 
      */
     private static final long serialVersionUID = 1L;
-
+    private PositionListWritable nodeIdList = new PositionListWritable();
+    private KmerListWritable forwardForwardList = new KmerListWritable(kmerSize);//怎么得到kmersize
+    private KmerListWritable forwardReverseList = new KmerListWritable(kmerSize);
+    private KmerListWritable reverseForwardList = new KmerListWritable(kmerSize);
+    private KmerListWritable reverseReverseList = new KmerListWritable(kmerSize);
+    private KmerBytesWritable kmer  = new KmerBytesWritable(kmerSize);
+    
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
         return new IAggregatorDescriptor() {
-            private PositionReference position = new PositionReference();
+            private NodeWritable nodeAggreter = new NodeWritable();// 能否写在外面
 
             protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -62,27 +74,65 @@
 
             @Override
             public AggregateState createAggregateStates() {
-                return new AggregateState(new ArrayBackedValueStorage());
+                return new AggregateState(new NodeWritable());
             }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
-                inputVal.reset();
-                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
-                inputVal.append(position);
-
+                NodeWritable inputVal = (NodeWritable) state.state;
+                inputVal.reset(kmerSize);
+                nodeIdList.reset();
+                forwardForwardList.reset(kmerSize);
+                forwardReverseList.reset(kmerSize);
+                reverseForwardList.reset(kmerSize);
+                reverseReverseList.reset(kmerSize);
+                
+                kmer.set(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
+                nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
+                int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
+                forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
+                int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
+                forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
+                int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
+                reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
+                int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
+                reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
+                nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
+                
+                inputVal.getKmer().set(kmer);
+                inputVal.getNodeIdList().appendList(nodeIdList);
+                inputVal.getFFList().appendList(forwardForwardList);
+                inputVal.getFRList().appendList(forwardReverseList);
+                inputVal.getRFList().appendList(reverseForwardList);
+                inputVal.getRRList().appendList(reverseReverseList);
+                
                 // make an empty field
-                tupleBuilder.addFieldEndOffset();
+                tupleBuilder.addFieldEndOffset();///????为啥
             }
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
-                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
-                inputVal.append(position);
+                NodeWritable inputVal = (NodeWritable) state.state;
+                kmer.set(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
+                nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
+                int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
+                forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
+                int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
+                forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
+                int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
+                reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
+                int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
+                reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
+                nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
+                
+                inputVal.getKmer().set(kmer);
+                inputVal.getNodeIdList().appendList(nodeIdList);
+                inputVal.getFFList().appendList(forwardForwardList);
+                inputVal.getFRList().appendList(forwardReverseList);
+                inputVal.getRFList().appendList(reverseForwardList);
+                inputVal.getRRList().appendList(reverseReverseList);
             }
 
             @Override
@@ -95,11 +145,52 @@
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                NodeWritable inputVal = (NodeWritable) state.state;
                 try {
-                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+//                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
 
+                    tupleBuilder.addFieldEndOffset();// --------------为什么?
+                    //-------------------------------------------------------
+//                    tupleBuilder.reset();
+                    fieldOutput.write(inputVal.getKmer().getBytes(), inputVal.getKmer().getOffset(), inputVal.getKmer().getLength());
+                    
+                    tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
+                    
+                    tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
                     tupleBuilder.addFieldEndOffset();
+                    
+                    tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
+                            .getLength());
+                    
+                    tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
+                    tupleBuilder.addFieldEndOffset();
+                    
+                    tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
+                            .getLength());
+                    
+                    tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
+                    tupleBuilder.addFieldEndOffset();
+                    
+                    tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
+                            .getLength());
+                    
+                    tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
+                    tupleBuilder.addFieldEndOffset();
+                    
+                    tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
+                            .getLength());
+
+/*                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputBuffer, writer);
+                        outputAppender.reset(outputBuffer, true);
+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                tupleBuilder.getSize())) {
+                            throw new IllegalStateException(
+                                    "Failed to copy an record into a frame: the record kmerByteSize is too large.");
+                        }
+                    }*/
+
                 } catch (IOException e) {
                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java
deleted file mode 100644
index cc759a4..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.WritableComparable;
-
-public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
-
-    private static final long serialVersionUID = 1L;
-    public static final IntermediateNodeWritable EMPTY_NODE = new IntermediateNodeWritable();
-    
-    private KmerListWritable forwardForwardList;
-    private KmerListWritable forwardReverseList;
-    private KmerListWritable reverseForwardList;
-    private KmerListWritable reverseReverseList;
-    private ReadIDWritable readId;
-    public IntermediateNodeWritable(){
-        forwardForwardList = new KmerListWritable();
-        forwardReverseList = new KmerListWritable();
-        reverseForwardList = new KmerListWritable();
-        reverseReverseList = new KmerListWritable();
-        readId = new ReadIDWritable();
-    }
-    
-    public IntermediateNodeWritable(KmerListWritable FFList, KmerListWritable FRList,
-            KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
-        this();
-        set(FFList, FRList, RFList, RRList, uniqueKey);
-    }
-    
-    public void set(IntermediateNodeWritable node){
-        set(node.forwardForwardList, node.forwardReverseList, node.reverseForwardList, 
-                node.reverseReverseList, node.readId);
-    }
-    
-    public void set(KmerListWritable FFList, KmerListWritable FRList,
-            KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
-        this.forwardForwardList.set(FFList);
-        this.forwardReverseList.set(FRList);
-        this.reverseForwardList.set(RFList);
-        this.reverseReverseList.set(RRList);
-        this.readId.set(uniqueKey);
-    }
-
-    public void reset(int kmerSize) {
-        forwardForwardList.reset();
-        forwardReverseList.reset();
-        reverseForwardList.reset();
-        reverseReverseList.reset();
-        readId.reset();
-    }
-    
-    public KmerListWritable getFFList() {
-        return forwardForwardList;
-    }
-
-    public void setFFList(KmerListWritable forwardForwardList) {
-        this.forwardForwardList.set(forwardForwardList);
-    }
-
-    public KmerListWritable getFRList() {
-        return forwardReverseList;
-    }
-
-    public void setFRList(KmerListWritable forwardReverseList) {
-        this.forwardReverseList.set(forwardReverseList);
-    }
-
-    public KmerListWritable getRFList() {
-        return reverseForwardList;
-    }
-
-    public void setRFList(KmerListWritable reverseForwardList) {
-        this.reverseForwardList.set(reverseForwardList);
-    }
-
-    public KmerListWritable getRRList() {
-        return reverseReverseList;
-    }
-
-    public void setRRList(KmerListWritable reverseReverseList) {
-        this.reverseReverseList.set(reverseReverseList);
-    }
-
-	public ReadIDWritable getreadId() {
-        return readId;
-    }
-
-    public void setreadId(ReadIDWritable readId) {
-        this.readId.set(readId);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        this.forwardForwardList.readFields(in);
-        this.forwardReverseList.readFields(in);
-        this.reverseForwardList.readFields(in);
-        this.reverseReverseList.readFields(in);
-        this.readId.readFields(in);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        this.forwardForwardList.write(out);
-        this.forwardReverseList.write(out);
-        this.reverseForwardList.write(out);
-        this.reverseReverseList.write(out);
-        this.readId.write(out);
-    }
-
-    @Override
-    public int compareTo(IntermediateNodeWritable other) {
-        // TODO Auto-generated method stub
-        return this.readId.compareTo(other.readId);
-    }
-    
-    @Override
-    public int hashCode() {
-        return this.readId.hashCode();
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof IntermediateNodeWritable) {
-            IntermediateNodeWritable nw = (IntermediateNodeWritable) o;
-            return (this.forwardForwardList.equals(nw.forwardForwardList)
-                    && this.forwardReverseList.equals(nw.forwardReverseList)
-                    && this.reverseForwardList.equals(nw.reverseForwardList)
-                    && this.reverseReverseList.equals(nw.reverseReverseList) && (this.readId.equals(nw.readId)));
-        }
-        return false;
-    }
-    
-    @Override
-    public String toString() {
-        StringBuilder sbuilder = new StringBuilder();
-        sbuilder.append('(');
-        sbuilder.append(readId.toString()).append('\t');
-        sbuilder.append(forwardForwardList.toString()).append('\t');
-        sbuilder.append(forwardReverseList.toString()).append('\t');
-        sbuilder.append(reverseForwardList.toString()).append('\t');
-        sbuilder.append(reverseReverseList.toString()).append('\t').append(')');
-        return sbuilder.toString();
-    }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
index 2579029..b1b22ac 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
@@ -308,12 +308,27 @@
                     
                     tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
                     
+                    tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
+                    tupleBuilder.addFieldEndOffset();
+                    
                     tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
                             .getLength());
+                    
+                    tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
+                    tupleBuilder.addFieldEndOffset();
+                    
                     tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
                             .getLength());
+                    
+                    tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
+                    tupleBuilder.addFieldEndOffset();
+                    
                     tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
                             .getLength());
+                    
+                    tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
+                    tupleBuilder.addFieldEndOffset();
+                    
                     tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
                             .getLength());
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 3084722..3650553 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -24,7 +24,7 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
-import edu.uci.ics.genomix.type.IntermediateNodeWritable;
+import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
index c4e7063..6026ac1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
@@ -20,8 +20,8 @@
 
 import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
 import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.IntermediateNodeWritable;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerListWritable;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index bd761a5..4ce59a0 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -75,8 +75,8 @@
 
     @Test
     public void TestAll() throws Exception {
-        TestReader();
-//        TestGroupbyKmer();
+//        TestReader();
+        TestGroupbyKmer();
 //        TestMapKmerToRead();
 //        TestGroupByReadID();
 //        TestEndToEnd();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index dd37a6a..1a793cd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -39,13 +39,6 @@
      * initiate kmerSize, maxIteration
      */
     public void initVertex() {
-//        if (kmerSize == -1)
-//            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
-//        if (maxIteration < 0)
-//            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
-//        outFlag = (byte)0;
-//        outgoingMsg.reset();
-//        headFlag = (byte)(getVertexValue().getState() & State.IS_HEAD);
     }
     
     /**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 59104f7..05a4700 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -102,8 +102,6 @@
     /**
      * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
      */
-
-
     protected boolean setNextInfo(VertexValueWritable value) {
         if (value.getFFList().getCountOfPosition() > 0) {
             nextID.set(value.getFFList().getPosition(0));
@@ -121,7 +119,6 @@
     /**
      * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
      */
-     
     protected boolean setPrevInfo(VertexValueWritable value) {
         if (value.getRRList().getCountOfPosition() > 0) {
             prevID.set(value.getRRList().getPosition(0));
@@ -169,8 +166,7 @@
                     } else if (hasPrev && !prevHead) {
                         // compress this head to the reverse tail
                         sendUpdateMsgToSuccessor();
-                    } //else
-                        //voteToHalt();
+                    } 
                 }
             }else {
                     // I'm a tail
@@ -179,24 +175,20 @@
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
                             sendUpdateMsgToPredecessor();
-                        } //else
-                            //voteToHalt();
+                        }
                     } else if (!hasPrev) {
                         // no previous node
                         if (!nextHead && curID.compareTo(nextID) < 0) {
                             // merge towards tail in forward dir
                             sendUpdateMsgToPredecessor();
-                        } //else
-                            //voteToHalt();
+                        }
                     } else if (!hasNext) {
                         // no next node
                         if (!prevHead && curID.compareTo(prevID) < 0) {
                             // merge towards tail in reverse dir
                             sendUpdateMsgToSuccessor();
-                        } //else
-                            //voteToHalt();
-                    } //else
-                        //voteToHalt();
+                        }
+                    }
                 }
         }
         else if (getSuperstep() % 4 == 0){