Merge branch 'genomix/velvet_graphbuilding' into genomix/fullstack_genomix

Conflicts:
	genomix/genomix-data/.classpath
	genomix/genomix-data/.project
	genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
	genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
	genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index f849b21..4014377 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -11,6 +11,7 @@
 
 import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.driver.Driver;
@@ -41,6 +42,12 @@
 
         @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
         public String profiling = "false";
+        
+        @Option(name = "-pseudo-rate", usage = "the rate of pseduHead", required = false)
+        public float pseudoRate = -1;
+        
+        @Option(name = "-max-patitionround", usage = "max rounds in partition phase", required = false)
+        public int maxRound = -1;
     }
 
     public static void run(String[] args, PregelixJob job) throws Exception {
@@ -61,10 +68,18 @@
         FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
         job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
         job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+        job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
         if (options.numIteration > 0) {
             job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
             job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+            job.getConfiguration().setInt(P3ForPathMergeVertex.ITERATIONS, options.numIteration);
         }
+        
+        if (options.pseudoRate > 0 && options.pseudoRate <= 1)
+            job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
+        if (options.maxRound > 0)
+            job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, options.maxRound);
         return options;
+        
     }
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 68d70ad..110247e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -37,8 +37,8 @@
         @Override
         public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex)
                 throws IOException, InterruptedException {
-            if (vertex.getVertexValue().getState() != State.END_VERTEX
-                    && vertex.getVertexValue().getState() != State.MID_VERTEX) {
+            //&& vertex.getVertexValue().getState() != State.MID_VERTEX
+            if (vertex.getVertexValue().getState() != State.END_VERTEX) {
                 getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
             }
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
index f9574a4..c5378a2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
@@ -21,19 +21,21 @@
     private KmerBytesWritable sourceVertexId;
     private byte adjMap;
     private byte lastGeneCode;
+    private VKmerBytesWritable chainVertexId;
     private byte message;
 
     private byte checkMessage;
 
     public NaiveAlgorithmMessageWritable() {
         sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+        chainVertexId = new VKmerBytesWritable(1);
         adjMap = (byte) 0;
-        lastGeneCode = (byte) 0;
+        lastGeneCode = (byte) -1;
         message = Message.NON;
         checkMessage = (byte) 0;
     }
 
-    public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, byte message) {
+    public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, VKmerBytesWritable chainVertexId, byte message) {
         checkMessage = 0;
         if (sourceVertexId != null) {
             checkMessage |= CheckMessage.SOURCE;
@@ -47,13 +49,18 @@
             checkMessage |= CheckMessage.LASTGENECODE;
             this.lastGeneCode = lastGeneCode;
         }
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(chainVertexId);
+        }
         this.message = message;
     }
 
     public void reset() {
         checkMessage = 0;
         adjMap = (byte) 0;
-        lastGeneCode = (byte) 0;
+        lastGeneCode = (byte) -1;
+        chainVertexId.reset(1);
         message = Message.NON;
     }
 
@@ -84,12 +91,27 @@
     }
 
     public void setLastGeneCode(byte lastGeneCode) {
-        if (lastGeneCode != 0) {
+        if (lastGeneCode != -1) {
             checkMessage |= CheckMessage.LASTGENECODE;
             this.lastGeneCode = lastGeneCode;
         }
     }
 
+    public VKmerBytesWritable getChainVertexId() {
+        return chainVertexId;
+    }
+
+    public void setChainVertexId(VKmerBytesWritable chainVertexId) {
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(chainVertexId);
+        }
+    }
+    
+    public int getLengthOfChain() {
+        return chainVertexId.getKmerLength();
+    }
+
     public byte getMessage() {
         return message;
     }
@@ -98,6 +120,10 @@
         this.message = message;
     }
 
+    public boolean isGeneCode(){
+        return ((checkMessage & CheckMessage.LASTGENECODE) != 0); 
+    }
+    
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeByte(checkMessage);
@@ -107,6 +133,8 @@
             out.write(adjMap);
         if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
             out.write(lastGeneCode);
+        if ((checkMessage & CheckMessage.CHAIN) != 0)
+            chainVertexId.write(out);
         out.write(message);
     }
 
@@ -120,6 +148,8 @@
             adjMap = in.readByte();
         if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
             lastGeneCode = in.readByte();
+        if ((checkMessage & CheckMessage.CHAIN) != 0)
+            chainVertexId.readFields(in);
         message = in.readByte();
     }
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
new file mode 100644
index 0000000..44d47a0
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
@@ -0,0 +1,418 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10	0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class P3ForPathMergeVertex extends
+        Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+    public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
+    public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
+    public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
+    public static final String MAXROUND = "P3ForPathMergeVertex.maxRound";
+    public static int kmerSize = -1;
+    private int maxIteration = -1;
+    public static float pseudoRate = -1;
+    public static int maxRound = -1;
+
+    private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
+    private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
+
+    private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+    private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+    private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+
+    /**
+     * initiate kmerSize, maxIteration
+     */
+    public void initVertex() {
+        if (kmerSize == -1)
+            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if (maxIteration < 0)
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+        if(pseudoRate < 0)
+            pseudoRate = getContext().getConfiguration().getFloat(PSEUDORATE, 0.2f);
+        if (maxRound < 0)
+            maxRound = getContext().getConfiguration().getInt(MAXROUND, 2);
+        outgoingMsg.reset();
+    }
+
+    /**
+     * get destination vertex
+     */
+    public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
+        return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+    }
+
+    public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
+        return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+    }
+
+    public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap) {
+        VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
+        return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+    }
+
+    /**
+     * head send message to all next nodes
+     */
+    public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
+        for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+            if ((adjMap & (1 << x)) != 0) {
+                destVertexId.set(getDestVertexId(vertexId, x));
+                sendMsg(destVertexId, outgoingMsg);
+            }
+        }
+    }
+
+    /**
+     * head send message to all previous nodes
+     */
+    public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
+        for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+            if (((adjMap >> 4) & (1 << x)) != 0) {
+                destVertexId.set(getPreDestVertexId(vertexId, x));
+                sendMsg(destVertexId, outgoingMsg);
+            }
+        }
+    }
+
+    /**
+     * start sending message
+     */
+    public void startSendMsg() {
+        if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+            outgoingMsg.setMessage(Message.START);
+            sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+            voteToHalt();
+        }
+        if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+            outgoingMsg.setMessage(Message.END);
+            sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+            voteToHalt();
+        }
+    }
+
+    /**
+     * initiate head, rear and path node
+     */
+    public void initState(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+        if (msgIterator.hasNext()) {
+            do {
+                if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+                    msgIterator.next();
+                    voteToHalt();
+                } else {
+                    incomingMsg = msgIterator.next();
+                    setState();
+                }
+            } while (msgIterator.hasNext());
+        } else {
+            float random = (float) Math.random();
+            if (random < pseudoRate)
+                markPseudoHead();
+            else{
+                getVertexValue().setState(State.NON_VERTEX);
+                voteToHalt();
+            }
+            /*if (getVertexId().toString().equals("CCTCA") || getVertexId().toString().equals("CTCAG")) //AGTAC CCTCA CTCAG CGCCC ACGCC
+                markPseudoHead();
+            else
+                voteToHalt();*/
+        }
+    }
+
+    /**
+     * mark the pseudoHead
+     */
+    public void markPseudoHead() {
+        getVertexValue().setState(State.PSEUDOHEAD);
+        outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+        destVertexId
+                .set(getPreDestVertexId(getVertexId(), 
+                        GeneCode.getGeneCodeFromBitMap((byte) ((getVertexValue().getAdjMap() >> 4) & 0x0F))));
+        sendMsg(destVertexId, outgoingMsg);
+    }
+
+    /**
+     * mark the pseudoRear
+     */
+    public void markPseudoRear() {
+        if (incomingMsg.getMessage() == Message.FROMPSEUDOHEAD 
+                && getVertexValue().getState() != State.START_VERTEX) {
+            getVertexValue().setState(State.PSEUDOREAR);
+            voteToHalt();
+        }
+        else if(incomingMsg.getMessage() == Message.FROMPSEUDOHEAD 
+                && getVertexValue().getState() == State.START_VERTEX){
+            getVertexValue().setState(State.START_HALT);
+        }
+    }
+
+    /**
+     * set vertex state
+     */
+    public void setState() {
+        if (incomingMsg.getMessage() == Message.START) {
+            getVertexValue().setState(State.START_VERTEX);
+        } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+            getVertexValue().setState(State.END_VERTEX);
+            voteToHalt();
+        } else
+            voteToHalt();
+    }
+    
+    /**
+     * merge chain vertex
+     */
+    public void mergeChainVertex(){
+        if(incomingMsg.isGeneCode() == true){
+            getVertexValue().setMergeChain(
+                    kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
+                            incomingMsg.getLastGeneCode()));
+        }
+        else{
+            lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+                    incomingMsg.getChainVertexId()));
+            getVertexValue().setMergeChain(
+                    kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), 
+                            lastKmer));
+        }
+    }
+    
+    /**
+     * head node sends message to path node
+     */
+    public void sendMsgToPathVertexMergePhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+        if (getSuperstep() == 3 + 2 * maxRound + 2) {
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+            sendMsg(destVertexId, outgoingMsg);
+        } else {
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if (incomingMsg.getMessage() != Message.STOP) {
+                    mergeChainVertex();
+                    outgoingMsg.setSourceVertexId(getVertexId());
+                    destVertexId
+                            .set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), incomingMsg.getAdjMap()));
+                    sendMsg(destVertexId, outgoingMsg);
+                } else {
+                    mergeChainVertex();
+                    byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
+                    getVertexValue().setAdjMap(adjMap);
+                    getVertexValue().setState(State.FINAL_VERTEX);
+                    //String source = getVertexValue().getMergeChain().toString();
+                    //System.out.println();
+                }
+            }
+        }
+    }
+
+    /**
+     * path node sends message back to head node
+     */
+    public void responseMsgToHeadVertexMergePhase() {
+        deleteVertex(getVertexId());
+        outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+        if(getVertexValue().getLengthOfMergeChain() == 0)
+            outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+        else
+            outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+        if (getVertexValue().getState() == State.END_VERTEX)
+            outgoingMsg.setMessage(Message.STOP);
+        sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+    }
+    
+    /**
+     * head node sends message to path node in partition phase
+     */
+    public void sendMsgToPathVertexPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+        if (getSuperstep() == 4) {
+            getVertexValue().setMergeChain(getVertexId());
+            if(getVertexValue().getState() != State.START_HALT){
+                outgoingMsg.setSourceVertexId(getVertexId());
+                destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+                sendMsg(destVertexId, outgoingMsg);
+                voteToHalt();
+            }
+        } else {
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                //if from pseudoHead, voteToHalt(), otherwise ...
+                if (incomingMsg.getMessage() != Message.FROMPSEUDOHEAD){
+                    mergeChainVertex();
+                    byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
+                    getVertexValue().setAdjMap(adjMap);
+                    if (incomingMsg.getMessage() != Message.STOP 
+                            && incomingMsg.getMessage() != Message.FROMPSEUDOREAR) {
+                        outgoingMsg.setSourceVertexId(getVertexId());
+                        destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
+                                incomingMsg.getAdjMap()));
+                        sendMsg(destVertexId, outgoingMsg);
+                        voteToHalt();
+                    } else {
+                        //check head or pseudoHead
+                        if (getVertexValue().getState() == State.START_VERTEX
+                                && incomingMsg.getMessage() == Message.STOP) {
+                            getVertexValue().setState(State.FINAL_VERTEX);
+                            //String source = getVertexValue().getMergeChain().toString();
+                            //System.out.println();
+                        } else if(getVertexValue().getState() == State.PSEUDOHEAD
+                                && incomingMsg.getMessage() == Message.STOP)
+                            getVertexValue().setState(State.END_VERTEX);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * path node sends message back to head node in partition phase
+     */
+    public void responseMsgToHeadVertexPartitionPhase() {
+        if (getVertexValue().getState() == State.PSEUDOHEAD)
+            outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+        else {
+            deleteVertex(getVertexId());
+            outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+            if(getVertexValue().getLengthOfMergeChain() == 0)
+                outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+            else
+                outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+            if (getVertexValue().getState() == State.PSEUDOREAR)
+                outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
+            else if (getVertexValue().getState() == State.END_VERTEX)
+                outgoingMsg.setMessage(Message.STOP);
+        }
+        sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+        voteToHalt();
+    }
+    
+    /**
+     * final process the result of partition phase
+     */
+    public void finalProcessPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator){
+        while (msgIterator.hasNext()) {
+            incomingMsg = msgIterator.next();
+            mergeChainVertex();
+            byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
+            getVertexValue().setAdjMap(adjMap);
+            //check head or pseudoHead
+            if (getVertexValue().getState() == State.START_VERTEX
+                    && incomingMsg.getMessage() == Message.STOP) {
+                getVertexValue().setState(State.FINAL_VERTEX);
+                //String source = getVertexValue().getMergeChain().toString();
+                //System.out.println();
+            } else if(getVertexValue().getState() == State.PSEUDOHEAD
+                    && incomingMsg.getMessage() == Message.STOP)
+                getVertexValue().setState(State.END_VERTEX);
+        }
+    }
+    /**
+     * After partition phase, reset state: ex. psudoHead and psudoRear -> null
+     */
+    public void resetState() {
+        if (getVertexValue().getState() == State.PSEUDOHEAD || getVertexValue().getState() == State.PSEUDOREAR) {
+            getVertexValue().setState(State.NON_VERTEX);
+        }
+        if(getVertexValue().getState() == State.START_HALT)
+            getVertexValue().setState(State.START_VERTEX);
+    }
+
+    @Override
+    public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+        initVertex();
+        if (getSuperstep() == 1)
+            startSendMsg();
+        else if (getSuperstep() == 2)
+            initState(msgIterator);
+        else if (getSuperstep() == 3) {
+            if (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                markPseudoRear();
+            }
+        } else if (getSuperstep() % 2 == 0 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
+            sendMsgToPathVertexPartitionPhase(msgIterator);
+        } else if (getSuperstep() % 2 == 1 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                responseMsgToHeadVertexPartitionPhase();
+            }
+        } else if(getSuperstep() == 3 + 2 * maxRound + 1 && getSuperstep() <= maxIteration){
+            finalProcessPartitionPhase(msgIterator);
+        } else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
+            resetState();
+            if(getVertexValue().getState() == State.START_VERTEX)
+                sendMsgToPathVertexMergePhase(msgIterator);
+            voteToHalt();
+        } else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                responseMsgToHeadVertexMergePhase();
+            }
+            voteToHalt();
+        } else
+            voteToHalt();
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(P3ForPathMergeVertex.class.getSimpleName());
+        job.setVertexClass(P3ForPathMergeVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+        job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        Client.run(args, job);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index 85649b3..bb288ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -2,11 +2,13 @@
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -46,15 +48,15 @@
     }
 
     public static void main(String[] args) throws IOException {
-        /*Path dir = new Path("data/test8m");
-        Path outDir = new Path("data/input/test");
-        FileUtils.cleanDirectory(new File("data/input/test"));
+        Path dir = new Path("data/split.aa");
+        Path outDir = new Path("data/input");
+        FileUtils.cleanDirectory(new File("data/input"));
         Path inFile = new Path(dir, "part-0");
-        Path outFile = new Path(outDir, "part-0-out-100");
-        generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100);*/
-        String inFile = "data/shortjump_1.head8M.fastq";
+        Path outFile = new Path(outDir, "part-0-out-1000");
+        generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 1000);
+      /*  String inFile = "data/shortjump_1.head8M.fastq";
         String outFile = "data/testGeneFile";
-        generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);
+        generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);*/
     }
 
     public static String readTextFile(String fileName, int numOfLines) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index 4332471..fa5f73b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -6,7 +6,9 @@
     public static final byte START = 1;
     public static final byte END = 2;
     public static final byte STOP = 3;
-    public static final byte PSEUDOREAR = 4;
+    public static final byte FROMPSEUDOHEAD = 4;
+    public static final byte FROMPSEUDOREAR = 5;
+    public static final byte FROMSELF = 6;
 
     public final static class MESSAGE_CONTENT {
 
@@ -25,8 +27,14 @@
                 case STOP:
                     r = "STOP";
                     break;
-                case PSEUDOREAR:
-                    r = "PSEUDOREAR";
+                case FROMPSEUDOHEAD:
+                    r = "FROMPSEUDOHEAD";
+                    break;
+                case FROMPSEUDOREAR:
+                    r = "FROMPSEUDOREAR";
+                    break;
+                case FROMSELF:
+                    r = "FROMSELF";
                     break;
             }
             return r;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index c1f4696..6730299 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -5,7 +5,7 @@
     public static final byte NON_VERTEX = 0;
     public static final byte START_VERTEX = 1;
     public static final byte END_VERTEX = 2;
-    public static final byte MID_VERTEX = 3;
+    public static final byte START_HALT = 3;
     public static final byte PSEUDOHEAD = 4;
     public static final byte PSEUDOREAR = 5;
     public static final byte FINAL_VERTEX = 6;
@@ -25,8 +25,8 @@
                 case END_VERTEX:
                     r = "END_VERTEX";
                     break;
-                case MID_VERTEX:
-                    r = "MID_VERTEX";
+                case START_HALT:
+                    r = "START_HALT";
                     break;
                 case PSEUDOHEAD:
                     r = "PSEUDOHEAD";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 8b138f2..809ea34 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
 import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
@@ -50,10 +51,30 @@
     private static void genLogAlgorithmForMergeGraph() throws IOException {
         generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
     }
+    
+    private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(P3ForPathMergeVertex.class);
+        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+        job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 5);
+        job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.4f);
+        job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, 1);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void genP3ForMergeGraph() throws IOException {
+        generateP3ForMergeGraphJob("P3ForMergeGraph", outputBase
+                + "P3ForMergeGraph.xml");
+    }
 
     public static void main(String[] args) throws IOException {
-        genNaiveAlgorithmForMergeGraph();
-        genLogAlgorithmForMergeGraph();
+        //genNaiveAlgorithmForMergeGraph();
+        //genLogAlgorithmForMergeGraph();
+        genP3ForMergeGraph();
     }
 
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index dcbbb79..c8aaa84 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -45,19 +45,18 @@
 
     public static final String PreFix = "data/PathTestSet"; //"graphbuildresult";
     public static final String[] TestDir = { PreFix + File.separator
-    //+  "split.aa"};
-    //+ "split.aa"};/*, PreFix + File.separator
+    + "LongPath"};/*, PreFix + File.separator
             /*+ "CyclePath"};, PreFix + File.separator
             + "SimplePath", PreFix + File.separator
             + "SinglePath", PreFix + File.separator
             + "TreePath"};*/
-            + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
+           /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
             PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
             PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
             PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
             PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
             PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
-            PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };
+            PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
     private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";