debugging p4 and fix random seed issue
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 88cfb79..ae921c4 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -561,6 +561,8 @@
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(MergePathsH4.class);
         conf.setJobName("MergePathsH4 " + inputPath);
+        
+        //another comment
 
         FileInputFormat.addInputPaths(conf, inputPath);
         Path outputPath = new Path(inputPath + ".h4merge.tmp");
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index ec91767..a3ea666 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -128,6 +128,8 @@
                 return (path.matches(".*" + COMPLETE + "_i\\d+$") || path.matches(".*" + UPDATES + "_i\\d+$") || path.endsWith(lastMergeOutput));
             }
         };
+        // test comment 
+        
         StringBuilder sb = new StringBuilder();
         String delim = "";
         for (FileStatus file : dfs.globStatus(new Path(inputGraphPath.replaceAll("/$",  "") + "*"), updateFilter)) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 6729c78..4148982 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -112,6 +112,7 @@
             throws IOException {
         byte updateFlag = updateMsg.getFlag();
         NodeWritable updateNode = updateMsg.getNode();
+        
         if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
             // this message wants to update the edges of node.
             // remove position and merge its position lists with node
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 9238f9d..162663f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -53,27 +53,32 @@
      * get destination vertex
      */
     public PositionWritable getNextDestVertexId(VertexValueWritable value) {
-        if(value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+        if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
             posIterator = value.getFFList().iterator();
             outFlag |= MessageFlag.DIR_FF;
-        }
-        else{ // #FRList() > 0
+            return posIterator.next();
+        } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
             posIterator = value.getFRList().iterator();
             outFlag |= MessageFlag.DIR_FR;
+            return posIterator.next();
+        } else {
+          return null;  
         }
-        return posIterator.next();
+        
     }
 
     public PositionWritable getPreDestVertexId(VertexValueWritable value) {
-        if(value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+        if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
             posIterator = value.getRFList().iterator();
             outFlag |= MessageFlag.DIR_RF;
-        }
-        else{ // #RRList() > 0
+            return posIterator.next();
+        } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
             posIterator = value.getRRList().iterator();
             outFlag |= MessageFlag.DIR_RR;
+            return posIterator.next();
+        } else {
+            return null;
         }
-        return posIterator.next();
     }
 
     /**
@@ -223,7 +228,7 @@
     public void broadcastUpdateMsg(){
         if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
             outFlag |= MessageFlag.IS_HEAD;
-        switch(getVertexValue().getState() & 0b0001){
+        switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK){
             case MessageFlag.SHOULD_MERGEWITHPREV:
                 setSuccessorAdjMsg();
                 if(ifFlipWithPredecessor())
@@ -231,7 +236,8 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+                if(getNextDestVertexId(getVertexValue()) != null)
+                    sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
                 break;
             case MessageFlag.SHOULD_MERGEWITHNEXT:
                 setPredecessorAdjMsg();
@@ -240,7 +246,8 @@
                 outgoingMsg.setFlag(outFlag);
                 outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
                 outgoingMsg.setSourceVertexId(getVertexId());
-                sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+                if(getPreDestVertexId(getVertexValue()) != null)
+                    sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
                 break; 
         }
     }
@@ -324,7 +331,7 @@
      * This vertex tries to merge with next vertex and send update msg to neighber
      * @throws IOException 
      */
-    public void sendUpMsgToPredecessor(){
+    public void sendUpdateMsgToPredecessor(){
         byte state = getVertexValue().getState();
         state |= MessageFlag.SHOULD_MERGEWITHNEXT;
         getVertexValue().setState(state);
@@ -339,7 +346,7 @@
      * This vertex tries to merge with next vertex and send update msg to neighber
      * @throws IOException 
      */
-    public void sendUpMsgToSuccessor(){
+    public void sendUpdateMsgToSuccessor(){
         byte state = getVertexValue().getState();
         state |= MessageFlag.SHOULD_MERGEWITHPREV;
         getVertexValue().setState(state);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index f250afc..ef699fc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,14 +1,10 @@
 package edu.uci.ics.genomix.pregelix.operator.pathmerge;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.Random;
 
-import org.apache.hadoop.io.NullWritable;
-
 import edu.uci.ics.genomix.type.PositionWritable;
 
-import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.genomix.pregelix.client.Client;
 import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
@@ -17,7 +13,6 @@
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
 
 /*
  * vertexId: BytesWritable
@@ -55,7 +50,7 @@
     public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
     public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
 
-    private static long randSeed = -1;
+    private static long randSeed = 1;
     private float probBeingRandomHead = -1;
     private Random randGenerator;
     
@@ -79,8 +74,9 @@
             kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
         if (maxIteration < 0)
             maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
-        if (randSeed < 0)
-            randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+        //if (randSeed < 0)
+        //    randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+        randSeed = getSuperstep();
         randGenerator = new Random(randSeed);
         if (probBeingRandomHead < 0)
             probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
@@ -95,7 +91,9 @@
 
     protected boolean isNodeRandomHead(PositionWritable nodeID) {
         // "deterministically random", based on node id
-        randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+        //randGenerator.setSeed(randSeed);
+        //randSeed = randGenerator.nextInt();
+        randGenerator.setSeed((randSeed ^ nodeID.hashCode()) * 100000);//randSeed + nodeID.hashCode()
         return randGenerator.nextFloat() < probBeingRandomHead;
     }
 
@@ -157,36 +155,37 @@
             hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
             if (hasNext || hasPrev) {
                 if (curHead) {
-                    if (hasNext && !nextHead) {
+                    if (hasNext && !nextHead && (getNextDestVertexId(getVertexValue()) != null)) {
                         // compress this head to the forward tail
-                        sendUpMsgToPredecessor(); //TODO up -> update  From -> to
-                    } else if (hasPrev && !prevHead) {
+                        sendUpdateMsgToPredecessor(); //TODO up -> update  From -> to
+                    } else if (hasPrev && !prevHead && (getPreDestVertexId(getVertexValue()) != null)) {
                         // compress this head to the reverse tail
-                        sendUpMsgToSuccessor();
+                        sendUpdateMsgToSuccessor();
                     }
                 } else {
                     // I'm a tail
                     if (hasNext && hasPrev) {
-                        if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+                         if ((!nextHead && !prevHead) && (curID.compareTo(nextID) > 0 && curID.compareTo(prevID) > 0)) {
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
-                            sendUpMsgToPredecessor();
+                            sendUpdateMsgToPredecessor();
                         }
                     } else if (!hasPrev) {
                         // no previous node
-                        if (!nextHead && curID.compareTo(nextID) < 0) {
+                        if (!nextHead && curID.compareTo(nextID) > 0) {
                             // merge towards tail in forward dir
-                            sendUpMsgToPredecessor();
+                            sendUpdateMsgToPredecessor();
                         }
                     } else if (!hasNext) {
                         // no next node
-                        if (!prevHead && curID.compareTo(prevID) < 0) {
+                        if (!prevHead && curID.compareTo(prevID) > 0) {
                             // merge towards tail in reverse dir
-                            sendUpMsgToSuccessor();
+                            sendUpdateMsgToSuccessor();
                         }
                     }
                 }
             }
+            voteToHalt();
         }
         else if (getSuperstep() % 4 == 0){
             //update neighber
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index a63da20..5a113e0 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -18,6 +18,7 @@
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
 import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
 import edu.uci.ics.genomix.type.PositionWritable;
@@ -79,6 +80,23 @@
                 + "P3ForMergeGraph.xml");
     }
     
+    private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(P4ForPathMergeVertex.class);
+        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+        job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(PositionWritable.class);
+        job.setOutputValueClass(VertexValueWritable.class);
+        job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void genP4ForMergeGraph() throws IOException {
+        generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+                + "P4ForMergeGraph.xml");
+    }
+    
     private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(TipAddVertex.class);
@@ -183,7 +201,7 @@
     
     public static void main(String[] args) throws IOException {
         //genNaiveAlgorithmForMergeGraph();
-        genLogAlgorithmForMergeGraph();
+        //genLogAlgorithmForMergeGraph();
         //genP3ForMergeGraph();
         //genTipAddGraph();
         //genTipRemoveGraph();
@@ -191,6 +209,7 @@
         //genBridgeRemoveGraph();
         //genBubbleAddGraph();
         //genBubbleMergeGraph();
+        genP4ForMergeGraph();
     }
 
 }