add p4 for path merge
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index bf83333..87016b7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -139,10 +139,12 @@
         if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
             outgoingMsg.setMessage(Message.START);
             sendMsg(getVertexId(), outgoingMsg); //send to itself
+            voteToHalt();
         }
         if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
             outgoingMsg.setMessage(Message.END);
             sendMsg(getVertexId(), outgoingMsg); //send to itself
+            voteToHalt();
         }
     }
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
new file mode 100644
index 0000000..130d1cf
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -0,0 +1,353 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10    0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class P4ForPathMergeVertex extends
+        Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+    public static final String KMER_SIZE = "P4ForPathMergeVertex.kmerSize";
+    public static final String ITERATIONS = "P4ForPathMergeVertex.iteration";
+    public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
+    public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
+    public static int kmerSize = -1;
+    private int maxIteration = -1;
+
+    private static long randSeed = -1;
+    private float probBeingRandomHead = -1;
+    private Random randGenerator;
+    
+    private PositionWritable curID = new PositionWritable();
+    private PositionWritable nextID = new PositionWritable();
+    private PositionWritable prevID = new PositionWritable();
+    private boolean hasNext;
+    private boolean hasPrev;
+    private boolean curHead;
+    private boolean nextHead;
+    private boolean prevHead;
+    private byte headFlag;
+    private byte tailFlag;
+    private byte outFlag;
+    
+    private MessageWritable incomingMsg = new MessageWritable();
+    private MessageWritable outgoingMsg = new MessageWritable();
+    private PositionWritable destVertexId = new PositionWritable();
+    private Iterator<PositionWritable> posIterator;
+    
+    /**
+     * initiate kmerSize, maxIteration
+     */
+    public void initVertex() {
+        if (kmerSize == -1)
+            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if (maxIteration < 0)
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+        if (randSeed < 0)
+            randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+        randGenerator = new Random(randSeed);
+        if (probBeingRandomHead < 0)
+            probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
+        hasNext = false;
+        hasPrev = false;
+        curHead = false;
+        nextHead = false;
+        prevHead = false;
+        outgoingMsg.reset();
+    }
+
+    protected boolean isNodeRandomHead(PositionWritable nodeID) {
+        // "deterministically random", based on node id
+        randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+        return randGenerator.nextFloat() < probBeingRandomHead;
+    }
+
+    /**
+     * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
+     */
+    protected boolean setNextInfo(ValueStateWritable value) {
+        if (value.getFFList().getCountOfPosition() > 0) {
+            nextID.set(value.getFFList().getPosition(0));
+            nextHead = isNodeRandomHead(nextID);
+            return true;
+        }
+        if (value.getFRList().getCountOfPosition() > 0) {
+            nextID.set(value.getFRList().getPosition(0));
+            nextHead = isNodeRandomHead(nextID);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
+     */
+    protected boolean setPrevInfo(ValueStateWritable value) {
+        if (value.getRRList().getCountOfPosition() > 0) {
+            prevID.set(value.getRRList().getPosition(0));
+            prevHead = isNodeRandomHead(prevID);
+            return true;
+        }
+        if (value.getRFList().getCountOfPosition() > 0) {
+            prevID.set(value.getRFList().getPosition(0));
+            prevHead = isNodeRandomHead(prevID);
+            return true;
+        }
+        return false;
+    }
+    
+    /**
+     * get destination vertex
+     */
+    public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+        if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+            posIterator = value.getFFList().iterator();
+        else // #FRList() > 0
+            posIterator = value.getFRList().iterator();
+        return posIterator.next();
+    }
+
+    public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+        if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+            posIterator = value.getRFList().iterator();
+        else // #RRList() > 0
+            posIterator = value.getRRList().iterator();
+        return posIterator.next();
+    }
+
+    /**
+     * head send message to all next nodes
+     */
+    public void sendMsgToAllNextNodes(ValueStateWritable value) {
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+    }
+
+    /**
+     * head send message to all previous nodes
+     */
+    public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+        posIterator = value.getRFList().iterator(); // RFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getRRList().iterator(); // RRList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+    }
+
+    /**
+     * start sending message
+     */
+    public void startSendMsg() {
+        if (VertexUtil.isHeadVertex(getVertexValue())) {
+            outgoingMsg.setMessage(Message.START);
+            sendMsgToAllNextNodes(getVertexValue());
+            voteToHalt();
+        }
+        if (VertexUtil.isRearVertex(getVertexValue())) {
+            outgoingMsg.setMessage(Message.END);
+            sendMsgToAllPreviousNodes(getVertexValue());
+            voteToHalt();
+        }
+        if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+            outgoingMsg.setMessage(Message.START);
+            sendMsg(getVertexId(), outgoingMsg); //send to itself
+            voteToHalt();
+        }
+        if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+            outgoingMsg.setMessage(Message.END);
+            sendMsg(getVertexId(), outgoingMsg); //send to itself
+            voteToHalt();
+        }
+    }
+
+    /**
+     * initiate head, rear and path node
+     */
+    public void initState(Iterator<MessageWritable> msgIterator) {
+        while (msgIterator.hasNext()) {
+            if (!VertexUtil.isPathVertex(getVertexValue())
+                    && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+                    && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+                msgIterator.next();
+                voteToHalt();
+            } else {
+                incomingMsg = msgIterator.next();
+                setState();
+            }
+        }
+    }
+
+    /**
+     * set vertex state
+     */
+    public void setState() {
+        if (incomingMsg.getMessage() == Message.START) {
+            getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
+        } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+            getVertexValue().setState(MessageFlag.IS_TAIL);
+            getVertexValue().setMergeChain(getVertexValue().getMergeChain());
+            //voteToHalt();
+        } //else
+            //voteToHalt();
+    }
+    
+    @Override
+    public void compute(Iterator<MessageWritable> msgIterator) {
+        initVertex();
+        if (getSuperstep() == 1)
+            startSendMsg();
+        else if (getSuperstep() == 2)
+            initState(msgIterator);
+        else{
+            // Node may be marked as head b/c it's a real head or a real tail
+            headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
+            tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
+            outFlag = (byte) (headFlag | tailFlag);
+            
+            // only PATH vertices are present. Find the ID's for my neighbors
+            curID.set(getVertexId());
+            
+            curHead = isNodeRandomHead(curID);
+            
+            // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
+            // We prevent merging towards non-path nodes
+            hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
+            hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+                outFlag |= MessageFlag.FROM_SELF;
+                getVertexValue().setState(outFlag);
+                voteToHalt();
+            }
+            if (hasNext || hasPrev) {
+                if (curHead) {
+                    if (hasNext && !nextHead) {
+                        // compress this head to the forward tail
+                        outFlag |= MessageFlag.FROM_PREDECESSOR;
+                        outgoingMsg.setMessage(outFlag);
+                        outgoingMsg.setSourceVertexId(getVertexId());
+                        destVertexId.set(getNextDestVertexId(getVertexValue()));
+                        sendMsg(destVertexId, outgoingMsg);
+                        deleteVertex(getVertexId());
+                    } else if (hasPrev && !prevHead) {
+                        // compress this head to the reverse tail
+                        outFlag |= MessageFlag.FROM_SUCCESSOR;
+                        outgoingMsg.setMessage(outFlag);
+                        outgoingMsg.setSourceVertexId(getVertexId());
+                        destVertexId.set(getNextDestVertexId(getVertexValue()));
+                        sendMsg(destVertexId, outgoingMsg);
+                        deleteVertex(getVertexId());
+                    }
+                } else {
+                    // I'm a tail
+                    if (hasNext && hasPrev) {
+                        if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+                            // tails on both sides, and I'm the "local minimum"
+                            // compress me towards the tail in forward dir
+                            outFlag |= MessageFlag.FROM_PREDECESSOR;
+                            outgoingMsg.setMessage(outFlag);
+                            outgoingMsg.setSourceVertexId(getVertexId());
+                            destVertexId.set(getNextDestVertexId(getVertexValue()));
+                            sendMsg(destVertexId, outgoingMsg);
+                            deleteVertex(getVertexId());
+                        }
+                    } else if (!hasPrev) {
+                        // no previous node
+                        if (!nextHead && curID.compareTo(nextID) < 0) {
+                            // merge towards tail in forward dir
+                            outFlag |= MessageFlag.FROM_PREDECESSOR;
+                            outgoingMsg.setMessage(outFlag);
+                            outgoingMsg.setSourceVertexId(getVertexId());
+                            destVertexId.set(getNextDestVertexId(getVertexValue()));
+                            sendMsg(destVertexId, outgoingMsg);
+                            deleteVertex(getVertexId());
+                        }
+                    } else if (!hasNext) {
+                        // no next node
+                        if (!prevHead && curID.compareTo(prevID) < 0) {
+                            // merge towards tail in reverse dir
+                            outFlag |= MessageFlag.FROM_SUCCESSOR;
+                            outgoingMsg.setMessage(outFlag);
+                            outgoingMsg.setSourceVertexId(getVertexId());
+                            destVertexId.set(getNextDestVertexId(getVertexValue()));
+                            sendMsg(destVertexId, outgoingMsg);
+                            deleteVertex(getVertexId());
+                        }
+                    }
+                }
+            }
+            
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(P4ForPathMergeVertex.class.getSimpleName());
+        job.setVertexClass(P4ForPathMergeVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+        job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(PositionWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        Client.run(args, job);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
new file mode 100644
index 0000000..e83598a
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.genomix.pregelix.type;
+
+public class MessageFlag {
+    public static final byte EMPTY_MESSAGE = 0;
+    public static final byte FROM_SELF = 1;
+    public static final byte FROM_SUCCESSOR = 1 << 1;
+    public static final byte FROM_PREDECESSOR = 1 << 2;
+    public static final byte IS_HEAD = 1 << 3;
+    public static final byte IS_TAIL = 1 << 4;
+
+    public static String getFlagAsString(byte code) {
+        // TODO: allow multiple flags to be set
+        switch (code) {
+            case EMPTY_MESSAGE:
+                return "EMPTY_MESSAGE";
+            case FROM_SELF:
+                return "FROM_SELF";
+            case FROM_SUCCESSOR:
+                return "FROM_SUCCESSOR";
+            case FROM_PREDECESSOR:
+                return "FROM_PREDECESSOR";
+            case IS_HEAD:
+                return "IS_HEAD";
+            case IS_TAIL:
+                return "IS_TAIL";
+        }
+        return "ERROR_BAD_MESSAGE";
+    }
+}