test cases
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index ef2dc9f..6870ff1 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -31,17 +31,31 @@
 
     // merge/update directions
     public static class DirectionFlag {
-        public static final byte DIR_FF = 0b00 << 0;
-        public static final byte DIR_FR = 0b01 << 0;
-        public static final byte DIR_RF = 0b10 << 0;
-        public static final byte DIR_RR = 0b11 << 0;
-        public static final byte DIR_MASK = 0b11 << 0;
+        public static final byte DIR_FF = 0b000 << 0;
+        public static final byte DIR_FR = 0b001 << 0;
+        public static final byte DIR_RF = 0b010 << 0;
+        public static final byte DIR_RR = 0b011 << 0;
+        public static final byte DIR_NO = 0b111 << 0;
+        public static final byte DIR_MASK = 0b111 << 0;
     }
     
-    public static class MergeDirFlag extends DirectionFlag{
-        public static final byte SHOULD_MERGEWITHNEXT = 0b0 << 2;
-        public static final byte SHOULD_MERGEWITHPREV = 0b1 << 2;
-        public static final byte SHOULD_MERGE_MASK = 0b1 << 2;
+    public static class SpecialVertexFlag extends DirectionFlag {
+        public static final byte IS_RANDOMTAIL = 0b00 << 4;
+        public static final byte IS_STOP = 0b00 << 4;
+        public static final byte IS_HEAD = 0b01 << 4;
+        public static final byte IS_FINAL = 0b10 << 4;
+        public static final byte IS_RANDOMHEAD = 0b11 << 4;
+        public static final byte IS_OLDHEAD = 0b11 << 4;
+
+        public static final byte VERTEX_MASK = 0b11 << 4;
+        public static final byte VERTEX_CLEAR = (byte) 11001111;
+    }
+    
+    public static class MergeDirFlag extends SpecialVertexFlag{
+        public static final byte NO_MERGE = 0b00 << 2;
+        public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 2;
+        public static final byte SHOULD_MERGEWITHPREV = 0b10 << 2;
+        public static final byte SHOULD_MERGE_MASK = 0b11 << 2;
     }
 
     private PositionWritable nodeID;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 8620445..ebcf7f0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -143,7 +143,7 @@
                 return;
             }
         }
-        throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+        //throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
     }
 
     public void set(PositionListWritable list2) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 2f12606..3d93baa 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -485,6 +485,16 @@
         FileSystem dfs = FileSystem.get(conf);
         conf.setJarByClass(MergePathsH4.class);
         conf.setJobName("MergePathsH4 " + inputPath);
+<<<<<<< HEAD
+        
+        //another comment
+
+        FileInputFormat.addInputPaths(conf, inputPath);
+        Path outputPath = new Path(inputPath + ".h4merge.tmp");
+        FileOutputFormat.setOutputPath(conf, outputPath);
+
+=======
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
         conf.setInputFormat(SequenceFileInputFormat.class);
         conf.setOutputFormat(SequenceFileOutputFormat.class);
         conf.setMapOutputKeyClass(PositionWritable.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index 98fe5f6..2765920 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -112,6 +112,28 @@
                 break;
             }
         }
+<<<<<<< HEAD
+        
+        // finally, combine all the completed paths and update messages to
+        // create a single merged graph output
+        dfs.delete(new Path(outputGraphPath), true); // clear any previous
+                                                     // output
+        // use all the "complete" and "update" outputs in addition to the final
+        // (possibly empty) toMerge directories
+        // as input to the final update step. This builds a comma-delim'ed
+        // String of said files.
+        final String lastMergeOutput = mergeOutput;
+        PathFilter updateFilter = new PathFilter() {
+            @Override
+            public boolean accept(Path arg0) {
+                String path = arg0.toString();
+                System.out.println("equals last: " + path + " vs " + lastMergeOutput + " = " + path.endsWith(lastMergeOutput));
+                return (path.matches(".*" + COMPLETE + "_i\\d+$") || path.matches(".*" + UPDATES + "_i\\d+$") || path.endsWith(lastMergeOutput));
+            }
+        };
+        // test comment 
+        
+=======
         if (!mergeComplete) {
             // if the merge didn't finish, we have to do one final iteration to convert back into (NodeWritable, NullWritable) pairs
             ConvertGraphFromNodeWithFlagToNodeWritable converter = new ConvertGraphFromNodeWithFlagToNodeWritable();
@@ -120,6 +142,7 @@
         }
 
         // final output string is a comma-separated list of completeOutputs
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
         StringBuilder sb = new StringBuilder();
         String delim = "";
         for (String output : completeOutputs) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 4dcff2d..3af7069 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -113,6 +113,7 @@
     public void processUpdates(NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
         byte updateFlag = updateMsg.getFlag();
         NodeWritable updateNode = updateMsg.getNode();
+        
         if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
             // this message wants to update the edges of node.
             // remove position and merge its position lists with node
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-0 b/genomix/genomix-pregelix/data/input/pathmerge/part-0
new file mode 100755
index 0000000..56056ab
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-1 b/genomix/genomix-pregelix/data/input/pathmerge/part-1
new file mode 100755
index 0000000..95701ce
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-2 b/genomix/genomix-pregelix/data/input/pathmerge/part-2
new file mode 100755
index 0000000..6bb6633
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-3 b/genomix/genomix-pregelix/data/input/pathmerge/part-3
new file mode 100755
index 0000000..caa63de
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-0 b/genomix/genomix-pregelix/data/input/read2/part-0
new file mode 100755
index 0000000..1620187
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-1 b/genomix/genomix-pregelix/data/input/read2/part-1
new file mode 100755
index 0000000..d2e2476
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-2 b/genomix/genomix-pregelix/data/input/read2/part-2
new file mode 100755
index 0000000..7f3575e
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-3 b/genomix/genomix-pregelix/data/input/read2/part-3
new file mode 100755
index 0000000..03e23e1
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index c4365ff..9e2e5f0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -8,6 +8,7 @@
 
 import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -35,7 +36,8 @@
         @Override
         public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
                 throws IOException, InterruptedException {
-            getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+            if(vertex.getVertexValue().getState() != MessageFlag.IS_OLDHEAD)
+                getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
         }
     }
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 9238f9d..c8d1852 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -29,6 +29,8 @@
     protected Iterator<PositionWritable> posIterator;
     byte headFlag;
     protected byte outFlag;
+    protected byte inFlag;
+    protected byte selfFlag;
     
     /**
      * initiate kmerSize, maxIteration
@@ -53,27 +55,32 @@
      * get destination vertex
      */
     public PositionWritable getNextDestVertexId(VertexValueWritable value) {
-        if(value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+        if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
             posIterator = value.getFFList().iterator();
             outFlag |= MessageFlag.DIR_FF;
-        }
-        else{ // #FRList() > 0
+            return posIterator.next();
+        } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
             posIterator = value.getFRList().iterator();
             outFlag |= MessageFlag.DIR_FR;
+            return posIterator.next();
+        } else {
+          return null;  
         }
-        return posIterator.next();
+        
     }
 
     public PositionWritable getPreDestVertexId(VertexValueWritable value) {
-        if(value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+        if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
             posIterator = value.getRFList().iterator();
             outFlag |= MessageFlag.DIR_RF;
-        }
-        else{ // #RRList() > 0
+            return posIterator.next();
+        } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
             posIterator = value.getRRList().iterator();
             outFlag |= MessageFlag.DIR_RR;
+            return posIterator.next();
+        } else {
+            return null;
         }
-        return posIterator.next();
     }
 
     /**
@@ -202,8 +209,10 @@
     public void setSuccessorAdjMsg(){
         if(getVertexValue().getFFList().getLength() > 0)
             outFlag |= MessageFlag.DIR_FF;
-        else
+        else if(getVertexValue().getFRList().getLength() > 0)
             outFlag |= MessageFlag.DIR_FR;
+        else
+            outFlag |= MessageFlag.DIR_NO;
     }
     
     /**
@@ -212,8 +221,10 @@
     public void setPredecessorAdjMsg(){
         if(getVertexValue().getRFList().getLength() > 0)
             outFlag |= MessageFlag.DIR_RF;
-        else
+        else if(getVertexValue().getRRList().getLength() > 0)
             outFlag |= MessageFlag.DIR_RR;
+        else
+            outFlag |= MessageFlag.DIR_NO;
     }
     
     /**
@@ -223,7 +234,7 @@
     public void broadcastUpdateMsg(){
         if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
             outFlag |= MessageFlag.IS_HEAD;
-        switch(getVertexValue().getState() & 0b0001){
+        switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK){
             case MessageFlag.SHOULD_MERGEWITHPREV:
                 setSuccessorAdjMsg();
                 if(ifFlipWithPredecessor())
@@ -231,7 +242,8 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+                if(getNextDestVertexId(getVertexValue()) != null)
+                    sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
                 break;
             case MessageFlag.SHOULD_MERGEWITHNEXT:
                 setPredecessorAdjMsg();
@@ -240,7 +252,8 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+                if(getPreDestVertexId(getVertexValue()) != null)
+                    sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
                 break; 
         }
     }
@@ -250,14 +263,14 @@
      * @throws IOException 
      */
     public void sendMergeMsg(){
-        if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0){
+        if(selfFlag == MessageFlag.IS_HEAD){
             byte newState = getVertexValue().getState(); 
             newState &= ~MessageFlag.IS_HEAD;
             newState |= MessageFlag.IS_OLDHEAD;
             getVertexValue().setState(newState);
+            resetSelfFlag();
             outFlag |= MessageFlag.IS_HEAD;
-            voteToHalt();
-        } else if((getVertexValue().getState() & MessageFlag.IS_OLDHEAD) > 0){
+        } else if(selfFlag == MessageFlag.IS_OLDHEAD){
             outFlag |= MessageFlag.IS_OLDHEAD;
             voteToHalt();
         }
@@ -294,7 +307,7 @@
      * @throws IOException 
      */
     public void broadcastMergeMsg(){
-        if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+        if(headFlag > 0)
             outFlag |= MessageFlag.IS_HEAD;
         switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK) {
             case MessageFlag.SHOULD_MERGEWITHNEXT:
@@ -306,6 +319,7 @@
                 outgoingMsg.setSourceVertexId(getVertexId());
                 outgoingMsg.setChainVertexId(getVertexValue().getKmer());
                 sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+                deleteVertex(getVertexId());
                 break;
             case MessageFlag.SHOULD_MERGEWITHPREV:
                 setPredecessorAdjMsg();
@@ -316,6 +330,7 @@
                 outgoingMsg.setSourceVertexId(getVertexId());
                 outgoingMsg.setChainVertexId(getVertexValue().getKmer());
                 sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+                deleteVertex(getVertexId());
                 break; 
         }
     }
@@ -324,7 +339,7 @@
      * This vertex tries to merge with next vertex and send update msg to neighber
      * @throws IOException 
      */
-    public void sendUpMsgToPredecessor(){
+    public void sendUpdateMsgToPredecessor(){
         byte state = getVertexValue().getState();
         state |= MessageFlag.SHOULD_MERGEWITHNEXT;
         getVertexValue().setState(state);
@@ -339,7 +354,7 @@
      * This vertex tries to merge with next vertex and send update msg to neighber
      * @throws IOException 
      */
-    public void sendUpMsgToSuccessor(){
+    public void sendUpdateMsgToSuccessor(){
         byte state = getVertexValue().getState();
         state |= MessageFlag.SHOULD_MERGEWITHPREV;
         getVertexValue().setState(state);
@@ -411,11 +426,18 @@
      * merge and updateAdjList merge with one neighbor
      */
     public void processMerge(){
-        byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+        inFlag = incomingMsg.getFlag();
+        byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
         byte neighborToMeDir = mirrorDirection(meToNeighborDir);
         
+        if((inFlag & MessageFlag.IS_HEAD) > 0){
+            byte state = getVertexValue().getState();
+            state |= MessageFlag.IS_HEAD;
+            getVertexValue().setState(state);
+        }
+        
         boolean flip;
-        if((outFlag & MessageFlag.FLIP) > 0)
+        if((inFlag & MessageFlag.FLIP) > 0)
             flip = true;
         else
             flip = false;
@@ -445,6 +467,50 @@
                 kmerSize, msg.getKmer());
     }
     
+    /**
+     * set head state
+     */
+    public void setHeadState(){
+        byte state = getVertexValue().getState();
+        state &= MessageFlag.VERTEX_CLEAR;
+        state |= MessageFlag.IS_HEAD;
+        getVertexValue().setState(state);
+    }
+    
+    /**
+     * set final state
+     */
+    public void setFinalState(){
+        byte state = getVertexValue().getState();
+        state &= MessageFlag.VERTEX_CLEAR;
+        state |= MessageFlag.IS_FINAL;
+        getVertexValue().setState(state);
+    }
+    
+    /**
+     * set final state
+     */
+    public void setStopFlag(){
+        byte state = incomingMsg.getFlag();
+        state &= MessageFlag.VERTEX_CLEAR;
+        state |= MessageFlag.IS_STOP;
+        getVertexValue().setState(state);
+    }
+    
+    /**
+     * get Vertex state
+     */
+    public byte getMsgFlag(){
+        return (byte)(incomingMsg.getFlag() & MessageFlag.VERTEX_MASK);
+    }
+    
+    /**
+     * reset selfFlag
+     */
+    public void resetSelfFlag(){
+        selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
+    }
+    
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 429282b..500a903 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -44,10 +44,8 @@
     BasicPathMergeVertex {
 
     private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
-    protected MessageWritable outgoingMsg2 = new MessageWritable();
-    protected PositionWritable destVertexId2 = new PositionWritable();
-    
-    byte finalFlag;
+    PositionWritable tempPostition = new PositionWritable();
+
     /**
      * initiate kmerSize, maxIteration
      */
@@ -57,7 +55,9 @@
         if (maxIteration < 0)
             maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
         headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
+        selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
         outgoingMsg.reset();
+        receivedMsgList.clear();
     }
 
     /**
@@ -65,16 +65,22 @@
      */
     public void sendOutMsg() {
         //send wantToMerge to next
-        destVertexId.set(getNextDestVertexId(getVertexValue()));
-        outgoingMsg.setFlag(outFlag);
-        outgoingMsg.setSourceVertexId(getVertexId());
-        sendMsg(destVertexId, outgoingMsg);
+        tempPostition = getNextDestVertexId(getVertexValue());
+        if(tempPostition != null){
+            destVertexId.set(tempPostition);
+            outgoingMsg.setFlag(outFlag);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            sendMsg(destVertexId, outgoingMsg);
+        }
         
         ////send wantToMerge to prev
-        destVertexId2.set(getPreDestVertexId(getVertexValue()));
-        outgoingMsg2.setFlag(outFlag);
-        outgoingMsg2.setSourceVertexId(getVertexId());
-        sendMsg(destVertexId2, outgoingMsg2);
+        tempPostition = getPreDestVertexId(getVertexValue());
+        if(tempPostition != null){
+            destVertexId.set(tempPostition);
+            outgoingMsg.setFlag(outFlag);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            sendMsg(destVertexId, outgoingMsg);
+        }
     }
     
     /**
@@ -84,88 +90,96 @@
         int countHead = 0;
         int countOldHead = 0;
         for(int i = 0; i < receivedMsgList.size(); i++){
-            if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_HEAD) > 0)
-                countHead++;
-            if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_OLDHEAD) > 0)
-                countOldHead++;
+            inFlag = receivedMsgList.get(i).getFlag();
+            switch(inFlag & MessageFlag.VERTEX_MASK){
+                case MessageFlag.IS_HEAD:
+                    countHead++;
+                    break;
+                case MessageFlag.IS_OLDHEAD:
+                    countOldHead++;
+                    break;
+            }
         }
-        if(countHead == 0 && countOldHead == 0)
-            return MessageFromHead.BothMsgsFromNonHead;
-        else if(countHead == 2)
+        if(countHead == 2)
             return MessageFromHead.BothMsgsFromHead;
-        else if(countOldHead == 2)
-            return MessageFromHead.BothMsgsFromOldHead;
-        else if(countHead == 1 && headFlag == 0)
-            return MessageFromHead.OneMsgFromHeadToNonHead;
-        else if(countHead == 1 && headFlag > 0)
-            return MessageFromHead.OneMsgFromHeadToHead;
-        else if(countOldHead == 1)
-            return MessageFromHead.OneMsgFromNonHead;
-        
-        return MessageFromHead.NO_INFO;
+        else if(countHead == 1 && countOldHead == 1)
+            return MessageFromHead.OneMsgFromOldHeadAndOneFromHead;
+        else if(countHead == 1 && countOldHead == 0)
+            return MessageFromHead.OneMsgFromHeadAndOneFromNonHead;
+        else if(countHead == 0 && countOldHead == 0)
+            return MessageFromHead.BothMsgsFromNonHead;
+        else
+            return MessageFromHead.NO_MSG;
     }
 
     /**
      * head send message to path
      */
     public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
-        //process merge when receiving msg
-        while (msgIterator.hasNext()) {
-            incomingMsg = msgIterator.next();
-            receivedMsgList.add(incomingMsg);
-        }
-        if(receivedMsgList.size() != 0){
-            byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
-            switch(numOfMsgsFromHead){
-                case MessageFromHead.BothMsgsFromNonHead:
-                    for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    break;
-                case MessageFromHead.BothMsgsFromHead:
-                case MessageFromHead.BothMsgsFromOldHead:
-                    for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    getVertexValue().setState(MessageFlag.IS_FINAL);
-                    break;
-                case MessageFromHead.OneMsgFromHeadToNonHead:
-                    for(int i = 0; i < 2; i++)
-                        processMerge(receivedMsgList.get(i));
-                    getVertexValue().setState(MessageFlag.IS_HEAD);
-                    break;
-                case MessageFromHead.OneMsgFromHeadToHead: //stop condition
-                    for(int i = 0; i < 2; i++){
-                        if(headFlag > 0){
-                            processMerge(receivedMsgList.get(i));
-                            break;
-                        }
-                    }
-                    getVertexValue().setState(MessageFlag.IS_FINAL);
-                    //voteToHalt();
-                    break;
-                case MessageFromHead.OneMsgFromNonHead:
-                    //halt
-                    //voteToHalt();
-                    break;
-            }
-        }
         //send out wantToMerge msg
-        resetHeadFlag();
-        finalFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_FINAL);
-        outFlag = (byte)(headFlag | finalFlag);
-        if(outFlag == 0)
-            sendOutMsg();
+        if(selfFlag != MessageFlag.IS_HEAD){
+                sendOutMsg();
+        }
     }
 
     /**
      * path response message to head
      */
     public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
+        if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
+            getVertexValue().setState(MessageFlag.IS_STOP);
+            sendOutMsg();
+        }
         while (msgIterator.hasNext()) {
             incomingMsg = msgIterator.next();
-            sendMergeMsg();
+            if(getMsgFlag() == MessageFlag.IS_FINAL){
+                processMerge(incomingMsg);
+                getVertexValue().setState(MessageFlag.IS_FINAL);
+            }else
+                sendMergeMsg();
         }
     }
 
+    /**
+     * head vertex process merge
+     */
+    public void processMergeInHeadVertex(Iterator<MessageWritable> msgIterator){
+      //process merge when receiving msg
+        while (msgIterator.hasNext()) {
+            incomingMsg = msgIterator.next();
+            if(getMsgFlag() == MessageFlag.IS_FINAL){
+                setStopFlag();
+                sendMergeMsg();
+                break;
+            }
+            receivedMsgList.add(incomingMsg);
+        }
+        if(receivedMsgList.size() != 0){
+            byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
+             switch(numOfMsgsFromHead){
+                case MessageFromHead.BothMsgsFromHead:
+                case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
+                    for(int i = 0; i < 2; i++)
+                        processMerge(receivedMsgList.get(i));
+                    getVertexValue().setState(MessageFlag.IS_FINAL);
+                    voteToHalt();
+                    break;
+                case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
+                    for(int i = 0; i < 2; i++)
+                        processMerge(receivedMsgList.get(i));
+                    getVertexValue().setState(MessageFlag.IS_HEAD);
+                    break;
+                case MessageFromHead.BothMsgsFromNonHead:
+                    for(int i = 0; i < 2; i++)
+                        processMerge(receivedMsgList.get(i));
+                    break;
+                case MessageFromHead.NO_MSG:
+                    //halt
+                    deleteVertex(getVertexId());
+                    break;
+            }
+        }
+    }
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
         initVertex();
@@ -173,13 +187,17 @@
             startSendMsg();
         else if (getSuperstep() == 2)
             initState(msgIterator);
-        else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
+        else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
             sendMsgToPathVertex(msgIterator);
-            if(headFlag == 0)
+            if(selfFlag != MessageFlag.IS_HEAD)
                 voteToHalt();
-        } else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
+        } else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
             responseMsgToHeadVertex(msgIterator);
-        } else
+            if(selfFlag != MessageFlag.IS_HEAD)
+                voteToHalt();
+        } else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
+            processMergeInHeadVertex(msgIterator);
+        }else
             voteToHalt();
     }
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index f250afc..f70a76e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,14 +1,10 @@
 package edu.uci.ics.genomix.pregelix.operator.pathmerge;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.Random;
 
-import org.apache.hadoop.io.NullWritable;
-
 import edu.uci.ics.genomix.type.PositionWritable;
 
-import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.genomix.pregelix.client.Client;
 import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
@@ -16,8 +12,6 @@
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
 
 /*
  * vertexId: BytesWritable
@@ -55,7 +49,7 @@
     public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
     public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
 
-    private static long randSeed = -1;
+    private static long randSeed = 1;
     private float probBeingRandomHead = -1;
     private Random randGenerator;
     
@@ -67,8 +61,6 @@
     private boolean curHead;
     private boolean nextHead;
     private boolean prevHead;
-    private byte headFlag;
-    private byte tailFlag;
     private byte selfFlag;
     
     /**
@@ -79,8 +71,9 @@
             kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
         if (maxIteration < 0)
             maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
-        if (randSeed < 0)
-            randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+        //if (randSeed < 0)
+        //    randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+        randSeed = getSuperstep();
         randGenerator = new Random(randSeed);
         if (probBeingRandomHead < 0)
             probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
@@ -90,12 +83,17 @@
         nextHead = false;
         prevHead = false;
         outFlag = (byte)0;
+        inFlag = (byte)0;
+        // Node may be marked as head b/c it's a real head or a real tail
+        headFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
         outgoingMsg.reset();
     }
 
     protected boolean isNodeRandomHead(PositionWritable nodeID) {
         // "deterministically random", based on node id
-        randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+        //randGenerator.setSeed(randSeed);
+        //randSeed = randGenerator.nextInt();
+        randGenerator.setSeed((randSeed ^ nodeID.hashCode()) * 100000);//randSeed + nodeID.hashCode()
         return randGenerator.nextFloat() < probBeingRandomHead;
     }
 
@@ -141,48 +139,50 @@
         else if (getSuperstep() == 2)
             initState(msgIterator);
         else if (getSuperstep() % 4 == 3){
-            // Node may be marked as head b/c it's a real head or a real tail
-            headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
-            tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
-            outFlag = (byte) (headFlag | tailFlag);
+
+            //tailFlag = (byte) (MessageFlag.IS_TAIL & getVertexValue().getState());
+            //outFlag = (byte) (headFlag | tailFlag);
+            outFlag |= headFlag;
+            outFlag |= MessageFlag.NO_MERGE;
             
             // only PATH vertices are present. Find the ID's for my neighbors
             curID.set(getVertexId());
             
             curHead = isNodeRandomHead(curID);
             
+            
             // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
             // We prevent merging towards non-path nodes
-            hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
-            hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+            hasNext = setNextInfo(getVertexValue());//&& headFlag == 0;
+            hasPrev = setPrevInfo(getVertexValue());//&& headFlag == 0;
             if (hasNext || hasPrev) {
                 if (curHead) {
-                    if (hasNext && !nextHead) {
+                    if (hasNext && !nextHead && (getNextDestVertexId(getVertexValue()) != null)) {
                         // compress this head to the forward tail
-                        sendUpMsgToPredecessor(); //TODO up -> update  From -> to
-                    } else if (hasPrev && !prevHead) {
+                        sendUpdateMsgToPredecessor(); //TODO up -> update  From -> to
+                    } else if (hasPrev && !prevHead && (getPreDestVertexId(getVertexValue()) != null)) {
                         // compress this head to the reverse tail
-                        sendUpMsgToSuccessor();
+                        sendUpdateMsgToSuccessor();
                     }
                 } else {
                     // I'm a tail
                     if (hasNext && hasPrev) {
-                        if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+                         if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
-                            sendUpMsgToPredecessor();
+                            sendUpdateMsgToPredecessor();
                         }
                     } else if (!hasPrev) {
                         // no previous node
                         if (!nextHead && curID.compareTo(nextID) < 0) {
                             // merge towards tail in forward dir
-                            sendUpMsgToPredecessor();
+                            sendUpdateMsgToPredecessor();
                         }
                     } else if (!hasNext) {
                         // no next node
                         if (!prevHead && curID.compareTo(prevID) < 0) {
                             // merge towards tail in reverse dir
-                            sendUpMsgToSuccessor();
+                            sendUpdateMsgToSuccessor();
                         }
                     }
                 }
@@ -197,17 +197,16 @@
         } else if (getSuperstep() % 4 == 1){
             //send message to the merge object and kill self
             broadcastMergeMsg();
-            deleteVertex(getVertexId());
         } else if (getSuperstep() % 4 == 2){
             //merge kmer
             while (msgIterator.hasNext()) {
                 incomingMsg = msgIterator.next();
+                selfFlag = (byte) (MessageFlag.VERTEX_MASK & getVertexValue().getState());
                 processMerge();
                 
                 //head meets head, stop
-                headFlag = (byte) (MessageFlag.IS_HEAD & incomingMsg.getFlag());
-                selfFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
-                if((headFlag & selfFlag) > 0)
+                headFlag = (byte) (MessageFlag.VERTEX_MASK & incomingMsg.getFlag());
+                if(headFlag == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)
                     voteToHalt();
             }
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 27eb40a..d68c98b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -243,7 +243,7 @@
         if (incomingMsg.getFlag() == Message.START) {
             getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
         } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
-            getVertexValue().setState(MessageFlag.IS_TAIL);
+            getVertexValue().setState(MessageFlag.IS_HEAD);
             getVertexValue().setKmer(getVertexValue().getKmer());
             //voteToHalt();
         } //else
@@ -445,7 +445,7 @@
             // We prevent merging towards non-path nodes
             hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
             hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
-            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
                 getVertexValue().setState(outFlag);
                 voteToHalt();
             }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index c130ec6..855deaa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -2,21 +2,14 @@
 
 import edu.uci.ics.genomix.type.NodeWritable.MergeDirFlag;
 
-public class MessageFlag extends MergeDirFlag{
+public class MessageFlag extends MergeDirFlag {
     
-    public static final byte FLIP = 1 << 3;
-    public static final byte IS_HEAD = 1 << 4;
-    public static final byte IS_TAIL = 1 << 5;
-    public static final byte IS_OLDHEAD = 1 << 6;
-    public static final byte IS_FINAL = 0b000011;
+    public static final byte FLIP = 1 << 6;
+
     
     public static String getFlagAsString(byte code) {
         // TODO: allow multiple flags to be set
         switch (code) {
-            case IS_HEAD:
-                return "IS_HEAD";
-            case IS_OLDHEAD:
-                return "IS_OLDHEAD";
             case FLIP:
                 return "FLIP";
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
index f343c2e..05a2f95 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
@@ -1,13 +1,17 @@
 package edu.uci.ics.genomix.pregelix.type;
 
 public class MessageFromHead {
-    public static final byte BothMsgsFromHead = 1 << 0;
-    public static final byte BothMsgsFromNonHead = 1 << 1;
-    public static final byte BothMsgsFromOldHead = 1 << 2;
-    public static final byte OneMsgFromHead = 1 << 3;
-    public static final byte OneMsgFromNonHead = 1 << 4;
-    public static final byte OneMsgFromHeadToNonHead = 1 << 5;
-    public static final byte OneMsgFromHeadToHead = 1 << 6;
+    public static final byte BothMsgsFromHead = 0b0000 << 1;
+    public static final byte BothMsgsFromNonHead = 0b0001 << 1;
+    public static final byte BothMsgsFromOldHead = 0b0010 << 1;
+    public static final byte OneMsgFromHead = 0b0011 << 1;
+    public static final byte OneMsgFromNonHead = 0b0100 << 1;
+    public static final byte OneMsgFromHeadAndOneFromNonHead = 0b0101 << 1;
+    public static final byte OneMsgFromHeadToHead = 0b0110 << 1;
+    public static final byte OneMsgFromOldHeadToNonHead = 0b0111 << 1;
+    public static final byte OneMsgFromOldHeadToHead = 0b1000 << 1;
+    public static final byte OneMsgFromOldHeadAndOneFromHead = 0b1001 << 1;
+    public static final byte NO_MSG = 0b1010 << 1;
     
     public static final byte NO_INFO = 0 << 0;
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index a63da20..0a366db 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -18,6 +18,7 @@
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
 import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
 import edu.uci.ics.genomix.type.PositionWritable;
@@ -79,6 +80,23 @@
                 + "P3ForMergeGraph.xml");
     }
     
+    private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(P4ForPathMergeVertex.class);
+        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+        job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(PositionWritable.class);
+        job.setOutputValueClass(VertexValueWritable.class);
+        job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void genP4ForMergeGraph() throws IOException {
+        generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+                + "P4ForMergeGraph.xml");
+    }
+    
     private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(TipAddVertex.class);
@@ -191,6 +209,7 @@
         //genBridgeRemoveGraph();
         //genBubbleAddGraph();
         //genBubbleMergeGraph();
+        //genP4ForMergeGraph();
     }
 
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index 355bc7a..7bf5aa3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
 @SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
     private static final int KmerSize = 3;
-    private static final int ReadLength = 7;
+    private static final int ReadLength = 8;
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-    private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
+    private static final String DATA_INPUT_PATH = "data/graphbuild.test/read2.txt";
     private static final String HDFS_INPUT_PATH = "/webmap";
     private static final String HDFS_OUTPUT_PATH = "/webmap_result";