Apply undirected graph algorithm in p2
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 2c4ac0d..291839c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -31,17 +31,31 @@
 
     // merge/update directions
     public static class DirectionFlag {
-        public static final byte DIR_FF = 0b00 << 0;
-        public static final byte DIR_FR = 0b01 << 0;
-        public static final byte DIR_RF = 0b10 << 0;
-        public static final byte DIR_RR = 0b11 << 0;
-        public static final byte DIR_MASK = 0b11 << 0;
+        public static final byte DIR_FF = 0b000 << 0;
+        public static final byte DIR_FR = 0b001 << 0;
+        public static final byte DIR_RF = 0b010 << 0;
+        public static final byte DIR_RR = 0b011 << 0;
+        public static final byte DIR_NO = 0b111 << 0;
+        public static final byte DIR_MASK = 0b111 << 0;
     }
     
-    public static class MergeDirFlag extends DirectionFlag{
-        public static final byte SHOULD_MERGEWITHNEXT = 0b0 << 2;
-        public static final byte SHOULD_MERGEWITHPREV = 0b1 << 2;
-        public static final byte SHOULD_MERGE_MASK = 0b1 << 2;
+    public static class SpecialVertexFlag extends DirectionFlag {
+        public static final byte IS_RANDOMTAIL = 0b00 << 4;
+        public static final byte IS_STOP = 0b00 << 4;
+        public static final byte IS_HEAD = 0b01 << 4;
+        public static final byte IS_FINAL = 0b10 << 4;
+        public static final byte IS_RANDOMHEAD = 0b11 << 4;
+        public static final byte IS_OLDHEAD = 0b11 << 4;
+
+        public static final byte VERTEX_MASK = 0b11 << 4;
+        public static final byte VERTEX_CLEAR = (byte) 11001111;
+    }
+    
+    public static class MergeDirFlag extends SpecialVertexFlag{
+        public static final byte NO_MERGE = 0b00 << 2;
+        public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 2;
+        public static final byte SHOULD_MERGEWITHPREV = 0b10 << 2;
+        public static final byte SHOULD_MERGE_MASK = 0b11 << 2;
     }
 
     private PositionWritable nodeID;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 8620445..ebcf7f0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -143,7 +143,7 @@
                 return;
             }
         }
-        throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+        //throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
     }
 
     public void set(PositionListWritable list2) {
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-0 b/genomix/genomix-pregelix/data/input/pathmerge/part-0
new file mode 100755
index 0000000..56056ab
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-1 b/genomix/genomix-pregelix/data/input/pathmerge/part-1
new file mode 100755
index 0000000..95701ce
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-2 b/genomix/genomix-pregelix/data/input/pathmerge/part-2
new file mode 100755
index 0000000..6bb6633
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-3 b/genomix/genomix-pregelix/data/input/pathmerge/part-3
new file mode 100755
index 0000000..caa63de
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-0 b/genomix/genomix-pregelix/data/input/read2/part-0
new file mode 100755
index 0000000..1620187
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-1 b/genomix/genomix-pregelix/data/input/read2/part-1
new file mode 100755
index 0000000..d2e2476
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-2 b/genomix/genomix-pregelix/data/input/read2/part-2
new file mode 100755
index 0000000..7f3575e
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-3 b/genomix/genomix-pregelix/data/input/read2/part-3
new file mode 100755
index 0000000..03e23e1
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index c4365ff..9e2e5f0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -8,6 +8,7 @@
 
 import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -35,7 +36,8 @@
         @Override
         public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
                 throws IOException, InterruptedException {
-            getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+            if(vertex.getVertexValue().getState() != MessageFlag.IS_OLDHEAD)
+                getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
         }
     }
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 162663f..c8d1852 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -29,6 +29,8 @@
     protected Iterator<PositionWritable> posIterator;
     byte headFlag;
     protected byte outFlag;
+    protected byte inFlag;
+    protected byte selfFlag;
     
     /**
      * initiate kmerSize, maxIteration
@@ -207,8 +209,10 @@
     public void setSuccessorAdjMsg(){
         if(getVertexValue().getFFList().getLength() > 0)
             outFlag |= MessageFlag.DIR_FF;
-        else
+        else if(getVertexValue().getFRList().getLength() > 0)
             outFlag |= MessageFlag.DIR_FR;
+        else
+            outFlag |= MessageFlag.DIR_NO;
     }
     
     /**
@@ -217,8 +221,10 @@
     public void setPredecessorAdjMsg(){
         if(getVertexValue().getRFList().getLength() > 0)
             outFlag |= MessageFlag.DIR_RF;
-        else
+        else if(getVertexValue().getRRList().getLength() > 0)
             outFlag |= MessageFlag.DIR_RR;
+        else
+            outFlag |= MessageFlag.DIR_NO;
     }
     
     /**
@@ -257,14 +263,14 @@
      * @throws IOException 
      */
     public void sendMergeMsg(){
-        if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0){
+        if(selfFlag == MessageFlag.IS_HEAD){
             byte newState = getVertexValue().getState(); 
             newState &= ~MessageFlag.IS_HEAD;
             newState |= MessageFlag.IS_OLDHEAD;
             getVertexValue().setState(newState);
+            resetSelfFlag();
             outFlag |= MessageFlag.IS_HEAD;
-            voteToHalt();
-        } else if((getVertexValue().getState() & MessageFlag.IS_OLDHEAD) > 0){
+        } else if(selfFlag == MessageFlag.IS_OLDHEAD){
             outFlag |= MessageFlag.IS_OLDHEAD;
             voteToHalt();
         }
@@ -301,7 +307,7 @@
      * @throws IOException 
      */
     public void broadcastMergeMsg(){
-        if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+        if(headFlag > 0)
             outFlag |= MessageFlag.IS_HEAD;
         switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK) {
             case MessageFlag.SHOULD_MERGEWITHNEXT:
@@ -313,6 +319,7 @@
                 outgoingMsg.setSourceVertexId(getVertexId());
                 outgoingMsg.setChainVertexId(getVertexValue().getKmer());
                 sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+                deleteVertex(getVertexId());
                 break;
             case MessageFlag.SHOULD_MERGEWITHPREV:
                 setPredecessorAdjMsg();
@@ -323,6 +330,7 @@
                 outgoingMsg.setSourceVertexId(getVertexId());
                 outgoingMsg.setChainVertexId(getVertexValue().getKmer());
                 sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+                deleteVertex(getVertexId());
                 break; 
         }
     }
@@ -418,11 +426,18 @@
      * merge and updateAdjList merge with one neighbor
      */
     public void processMerge(){
-        byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+        inFlag = incomingMsg.getFlag();
+        byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
         byte neighborToMeDir = mirrorDirection(meToNeighborDir);
         
+        if((inFlag & MessageFlag.IS_HEAD) > 0){
+            byte state = getVertexValue().getState();
+            state |= MessageFlag.IS_HEAD;
+            getVertexValue().setState(state);
+        }
+        
         boolean flip;
-        if((outFlag & MessageFlag.FLIP) > 0)
+        if((inFlag & MessageFlag.FLIP) > 0)
             flip = true;
         else
             flip = false;
@@ -452,6 +467,50 @@
                 kmerSize, msg.getKmer());
     }
     
+    /**
+     * set head state
+     */
+    public void setHeadState(){
+        byte state = getVertexValue().getState();
+        state &= MessageFlag.VERTEX_CLEAR;
+        state |= MessageFlag.IS_HEAD;
+        getVertexValue().setState(state);
+    }
+    
+    /**
+     * set final state
+     */
+    public void setFinalState(){
+        byte state = getVertexValue().getState();
+        state &= MessageFlag.VERTEX_CLEAR;
+        state |= MessageFlag.IS_FINAL;
+        getVertexValue().setState(state);
+    }
+    
+    /**
+     * set final state
+     */
+    public void setStopFlag(){
+        byte state = incomingMsg.getFlag();
+        state &= MessageFlag.VERTEX_CLEAR;
+        state |= MessageFlag.IS_STOP;
+        getVertexValue().setState(state);
+    }
+    
+    /**
+     * get Vertex state
+     */
+    public byte getMsgFlag(){
+        return (byte)(incomingMsg.getFlag() & MessageFlag.VERTEX_MASK);
+    }
+    
+    /**
+     * reset selfFlag
+     */
+    public void resetSelfFlag(){
+        selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
+    }
+    
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 429282b..500a903 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -44,10 +44,8 @@
     BasicPathMergeVertex {
 
     private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
-    protected MessageWritable outgoingMsg2 = new MessageWritable();
-    protected PositionWritable destVertexId2 = new PositionWritable();
-    
-    byte finalFlag;
+    PositionWritable tempPostition = new PositionWritable();
+
     /**
      * initiate kmerSize, maxIteration
      */
@@ -57,7 +55,9 @@
         if (maxIteration < 0)
             maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
         headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
+        selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
         outgoingMsg.reset();
+        receivedMsgList.clear();
     }
 
     /**
@@ -65,16 +65,22 @@
      */
     public void sendOutMsg() {
         //send wantToMerge to next
-        destVertexId.set(getNextDestVertexId(getVertexValue()));
-        outgoingMsg.setFlag(outFlag);
-        outgoingMsg.setSourceVertexId(getVertexId());
-        sendMsg(destVertexId, outgoingMsg);
+        tempPostition = getNextDestVertexId(getVertexValue());
+        if(tempPostition != null){
+            destVertexId.set(tempPostition);
+            outgoingMsg.setFlag(outFlag);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            sendMsg(destVertexId, outgoingMsg);
+        }
         
         ////send wantToMerge to prev
-        destVertexId2.set(getPreDestVertexId(getVertexValue()));
-        outgoingMsg2.setFlag(outFlag);
-        outgoingMsg2.setSourceVertexId(getVertexId());
-        sendMsg(destVertexId2, outgoingMsg2);
+        tempPostition = getPreDestVertexId(getVertexValue());
+        if(tempPostition != null){
+            destVertexId.set(tempPostition);
+            outgoingMsg.setFlag(outFlag);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            sendMsg(destVertexId, outgoingMsg);
+        }
     }
     
     /**
@@ -84,88 +90,96 @@
         int countHead = 0;
         int countOldHead = 0;
         for(int i = 0; i < receivedMsgList.size(); i++){
-            if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_HEAD) > 0)
-                countHead++;
-            if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_OLDHEAD) > 0)
-                countOldHead++;
+            inFlag = receivedMsgList.get(i).getFlag();
+            switch(inFlag & MessageFlag.VERTEX_MASK){
+                case MessageFlag.IS_HEAD:
+                    countHead++;
+                    break;
+                case MessageFlag.IS_OLDHEAD:
+                    countOldHead++;
+                    break;
+            }
         }
-        if(countHead == 0 && countOldHead == 0)
-            return MessageFromHead.BothMsgsFromNonHead;
-        else if(countHead == 2)
+        if(countHead == 2)
             return MessageFromHead.BothMsgsFromHead;
-        else if(countOldHead == 2)
-            return MessageFromHead.BothMsgsFromOldHead;
-        else if(countHead == 1 && headFlag == 0)
-            return MessageFromHead.OneMsgFromHeadToNonHead;
-        else if(countHead == 1 && headFlag > 0)
-            return MessageFromHead.OneMsgFromHeadToHead;
-        else if(countOldHead == 1)
-            return MessageFromHead.OneMsgFromNonHead;
-        
-        return MessageFromHead.NO_INFO;
+        else if(countHead == 1 && countOldHead == 1)
+            return MessageFromHead.OneMsgFromOldHeadAndOneFromHead;
+        else if(countHead == 1 && countOldHead == 0)
+            return MessageFromHead.OneMsgFromHeadAndOneFromNonHead;
+        else if(countHead == 0 && countOldHead == 0)
+            return MessageFromHead.BothMsgsFromNonHead;
+        else
+            return MessageFromHead.NO_MSG;
     }
 
     /**
      * head send message to path
      */
     public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
-        //process merge when receiving msg
-        while (msgIterator.hasNext()) {
-            incomingMsg = msgIterator.next();
-            receivedMsgList.add(incomingMsg);
-        }
-        if(receivedMsgList.size() != 0){
-            byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
-            switch(numOfMsgsFromHead){
-                case MessageFromHead.BothMsgsFromNonHead:
-                    for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    break;
-                case MessageFromHead.BothMsgsFromHead:
-                case MessageFromHead.BothMsgsFromOldHead:
-                    for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    getVertexValue().setState(MessageFlag.IS_FINAL);
-                    break;
-                case MessageFromHead.OneMsgFromHeadToNonHead:
-                    for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    getVertexValue().setState(MessageFlag.IS_HEAD);
-                    break;
-                case MessageFromHead.OneMsgFromHeadToHead: //stop condition
-                    for(int i = 0; i < 2; i++){
-                        if(headFlag > 0){
-                            processMerge(receivedMsgList.get(i));
-                            break;
-                        }
-                    }
-                    getVertexValue().setState(MessageFlag.IS_FINAL);
-                    //voteToHalt();
-                    break;
-                case MessageFromHead.OneMsgFromNonHead:
-                    //halt
-                    //voteToHalt();
-                    break;
-            }
-        }
         //send out wantToMerge msg
-        resetHeadFlag();
-        finalFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_FINAL);
-        outFlag = (byte)(headFlag | finalFlag);
-        if(outFlag == 0)
-            sendOutMsg();
+        if(selfFlag != MessageFlag.IS_HEAD){
+                sendOutMsg();
+        }
     }
 
     /**
      * path response message to head
      */
     public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
+        if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
+            getVertexValue().setState(MessageFlag.IS_STOP);
+            sendOutMsg();
+        }
         while (msgIterator.hasNext()) {
             incomingMsg = msgIterator.next();
-            sendMergeMsg();
+            if(getMsgFlag() == MessageFlag.IS_FINAL){
+                processMerge(incomingMsg);
+                getVertexValue().setState(MessageFlag.IS_FINAL);
+            }else
+                sendMergeMsg();
         }
     }
 
+    /**
+     * head vertex process merge
+     */
+    public void processMergeInHeadVertex(Iterator<MessageWritable> msgIterator){
+      //process merge when receiving msg
+        while (msgIterator.hasNext()) {
+            incomingMsg = msgIterator.next();
+            if(getMsgFlag() == MessageFlag.IS_FINAL){
+                setStopFlag();
+                sendMergeMsg();
+                break;
+            }
+            receivedMsgList.add(incomingMsg);
+        }
+        if(receivedMsgList.size() != 0){
+            byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
+             switch(numOfMsgsFromHead){
+                case MessageFromHead.BothMsgsFromHead:
+                case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
+                    for(int i = 0; i < 2; i++)
+                        processMerge(receivedMsgList.get(i));
+                    getVertexValue().setState(MessageFlag.IS_FINAL);
+                    voteToHalt();
+                    break;
+                case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
+                    for(int i = 0; i < 2; i++)
+                        processMerge(receivedMsgList.get(i));
+                    getVertexValue().setState(MessageFlag.IS_HEAD);
+                    break;
+                case MessageFromHead.BothMsgsFromNonHead:
+                    for(int i = 0; i < 2; i++)
+                        processMerge(receivedMsgList.get(i));
+                    break;
+                case MessageFromHead.NO_MSG:
+                    //halt
+                    deleteVertex(getVertexId());
+                    break;
+            }
+        }
+    }
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
         initVertex();
@@ -173,13 +187,17 @@
             startSendMsg();
         else if (getSuperstep() == 2)
             initState(msgIterator);
-        else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
+        else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
             sendMsgToPathVertex(msgIterator);
-            if(headFlag == 0)
+            if(selfFlag != MessageFlag.IS_HEAD)
                 voteToHalt();
-        } else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
+        } else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
             responseMsgToHeadVertex(msgIterator);
-        } else
+            if(selfFlag != MessageFlag.IS_HEAD)
+                voteToHalt();
+        } else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
+            processMergeInHeadVertex(msgIterator);
+        }else
             voteToHalt();
     }
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index ef699fc..f70a76e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -12,7 +12,6 @@
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.type.State;
 
 /*
  * vertexId: BytesWritable
@@ -62,8 +61,6 @@
     private boolean curHead;
     private boolean nextHead;
     private boolean prevHead;
-    private byte headFlag;
-    private byte tailFlag;
     private byte selfFlag;
     
     /**
@@ -86,6 +83,9 @@
         nextHead = false;
         prevHead = false;
         outFlag = (byte)0;
+        inFlag = (byte)0;
+        // Node may be marked as head b/c it's a real head or a real tail
+        headFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
         outgoingMsg.reset();
     }
 
@@ -139,20 +139,22 @@
         else if (getSuperstep() == 2)
             initState(msgIterator);
         else if (getSuperstep() % 4 == 3){
-            // Node may be marked as head b/c it's a real head or a real tail
-            headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
-            tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
-            outFlag = (byte) (headFlag | tailFlag);
+
+            //tailFlag = (byte) (MessageFlag.IS_TAIL & getVertexValue().getState());
+            //outFlag = (byte) (headFlag | tailFlag);
+            outFlag |= headFlag;
+            outFlag |= MessageFlag.NO_MERGE;
             
             // only PATH vertices are present. Find the ID's for my neighbors
             curID.set(getVertexId());
             
             curHead = isNodeRandomHead(curID);
             
+            
             // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
             // We prevent merging towards non-path nodes
-            hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
-            hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+            hasNext = setNextInfo(getVertexValue());//&& headFlag == 0;
+            hasPrev = setPrevInfo(getVertexValue());//&& headFlag == 0;
             if (hasNext || hasPrev) {
                 if (curHead) {
                     if (hasNext && !nextHead && (getNextDestVertexId(getVertexValue()) != null)) {
@@ -165,27 +167,26 @@
                 } else {
                     // I'm a tail
                     if (hasNext && hasPrev) {
-                         if ((!nextHead && !prevHead) && (curID.compareTo(nextID) > 0 && curID.compareTo(prevID) > 0)) {
+                         if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
                             sendUpdateMsgToPredecessor();
                         }
                     } else if (!hasPrev) {
                         // no previous node
-                        if (!nextHead && curID.compareTo(nextID) > 0) {
+                        if (!nextHead && curID.compareTo(nextID) < 0) {
                             // merge towards tail in forward dir
                             sendUpdateMsgToPredecessor();
                         }
                     } else if (!hasNext) {
                         // no next node
-                        if (!prevHead && curID.compareTo(prevID) > 0) {
+                        if (!prevHead && curID.compareTo(prevID) < 0) {
                             // merge towards tail in reverse dir
                             sendUpdateMsgToSuccessor();
                         }
                     }
                 }
             }
-            voteToHalt();
         }
         else if (getSuperstep() % 4 == 0){
             //update neighber
@@ -196,17 +197,16 @@
         } else if (getSuperstep() % 4 == 1){
             //send message to the merge object and kill self
             broadcastMergeMsg();
-            deleteVertex(getVertexId());
         } else if (getSuperstep() % 4 == 2){
             //merge kmer
             while (msgIterator.hasNext()) {
                 incomingMsg = msgIterator.next();
+                selfFlag = (byte) (MessageFlag.VERTEX_MASK & getVertexValue().getState());
                 processMerge();
                 
                 //head meets head, stop
-                headFlag = (byte) (MessageFlag.IS_HEAD & incomingMsg.getFlag());
-                selfFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
-                if((headFlag & selfFlag) > 0)
+                headFlag = (byte) (MessageFlag.VERTEX_MASK & incomingMsg.getFlag());
+                if(headFlag == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)
                     voteToHalt();
             }
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 27eb40a..d68c98b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -243,7 +243,7 @@
         if (incomingMsg.getFlag() == Message.START) {
             getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
         } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
-            getVertexValue().setState(MessageFlag.IS_TAIL);
+            getVertexValue().setState(MessageFlag.IS_HEAD);
             getVertexValue().setKmer(getVertexValue().getKmer());
             //voteToHalt();
         } //else
@@ -445,7 +445,7 @@
             // We prevent merging towards non-path nodes
             hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
             hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
-            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
                 getVertexValue().setState(outFlag);
                 voteToHalt();
             }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index c130ec6..855deaa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -2,21 +2,14 @@
 
 import edu.uci.ics.genomix.type.NodeWritable.MergeDirFlag;
 
-public class MessageFlag extends MergeDirFlag{
+public class MessageFlag extends MergeDirFlag {
     
-    public static final byte FLIP = 1 << 3;
-    public static final byte IS_HEAD = 1 << 4;
-    public static final byte IS_TAIL = 1 << 5;
-    public static final byte IS_OLDHEAD = 1 << 6;
-    public static final byte IS_FINAL = 0b000011;
+    public static final byte FLIP = 1 << 6;
+
     
     public static String getFlagAsString(byte code) {
         // TODO: allow multiple flags to be set
         switch (code) {
-            case IS_HEAD:
-                return "IS_HEAD";
-            case IS_OLDHEAD:
-                return "IS_OLDHEAD";
             case FLIP:
                 return "FLIP";
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
index f343c2e..05a2f95 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
@@ -1,13 +1,17 @@
 package edu.uci.ics.genomix.pregelix.type;
 
 public class MessageFromHead {
-    public static final byte BothMsgsFromHead = 1 << 0;
-    public static final byte BothMsgsFromNonHead = 1 << 1;
-    public static final byte BothMsgsFromOldHead = 1 << 2;
-    public static final byte OneMsgFromHead = 1 << 3;
-    public static final byte OneMsgFromNonHead = 1 << 4;
-    public static final byte OneMsgFromHeadToNonHead = 1 << 5;
-    public static final byte OneMsgFromHeadToHead = 1 << 6;
+    public static final byte BothMsgsFromHead = 0b0000 << 1;
+    public static final byte BothMsgsFromNonHead = 0b0001 << 1;
+    public static final byte BothMsgsFromOldHead = 0b0010 << 1;
+    public static final byte OneMsgFromHead = 0b0011 << 1;
+    public static final byte OneMsgFromNonHead = 0b0100 << 1;
+    public static final byte OneMsgFromHeadAndOneFromNonHead = 0b0101 << 1;
+    public static final byte OneMsgFromHeadToHead = 0b0110 << 1;
+    public static final byte OneMsgFromOldHeadToNonHead = 0b0111 << 1;
+    public static final byte OneMsgFromOldHeadToHead = 0b1000 << 1;
+    public static final byte OneMsgFromOldHeadAndOneFromHead = 0b1001 << 1;
+    public static final byte NO_MSG = 0b1010 << 1;
     
     public static final byte NO_INFO = 0 << 0;
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 5a113e0..0a366db 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -201,7 +201,7 @@
     
     public static void main(String[] args) throws IOException {
         //genNaiveAlgorithmForMergeGraph();
-        //genLogAlgorithmForMergeGraph();
+        genLogAlgorithmForMergeGraph();
         //genP3ForMergeGraph();
         //genTipAddGraph();
         //genTipRemoveGraph();
@@ -209,7 +209,7 @@
         //genBridgeRemoveGraph();
         //genBubbleAddGraph();
         //genBubbleMergeGraph();
-        genP4ForMergeGraph();
+        //genP4ForMergeGraph();
     }
 
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index 355bc7a..7bf5aa3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
 @SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
     private static final int KmerSize = 3;
-    private static final int ReadLength = 7;
+    private static final int ReadLength = 8;
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-    private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
+    private static final String DATA_INPUT_PATH = "data/graphbuild.test/read2.txt";
     private static final String HDFS_INPUT_PATH = "/webmap";
     private static final String HDFS_OUTPUT_PATH = "/webmap_result";