update data clean 2013-06-06
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
new file mode 100644
index 0000000..55ddb1b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -0,0 +1,185 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class MergeBubbleMessageWritable implements WritableComparable<MergeBubbleMessageWritable> {
+    /**
+     * sourceVertexId stores source vertexId when headVertex sends the message
+     * stores neighber vertexValue when pathVertex sends the message
+     * file stores the point to the file that stores the chains of connected DNA
+     */
+    private PositionWritable sourceVertexId;
+    private KmerBytesWritable chainVertexId;
+    private AdjacencyListWritable neighberNode; //incoming or outgoing
+    private byte message;
+    private PositionWritable startVertexId;
+
+    private byte checkMessage;
+
+    public MergeBubbleMessageWritable() {
+        sourceVertexId = new PositionWritable();
+        chainVertexId = new KmerBytesWritable(0);
+        neighberNode = new AdjacencyListWritable();
+        message = Message.NON;
+        startVertexId = new PositionWritable();
+        checkMessage = (byte) 0;
+    }
+    
+    public void set(MessageWritable msg) {
+        checkMessage = 0;
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(msg.getSourceVertexId());
+        }
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(msg.getChainVertexId());
+        }
+        if (neighberNode != null) {
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(msg.getNeighberNode());
+        }
+        this.message = msg.getMessage();
+    }
+
+    public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+        checkMessage = 0;
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+        }
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(chainVertexId);
+        }
+        if (neighberNode != null) {
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(neighberNode);
+        }
+        this.message = message;
+    }
+
+    public void reset() {
+        checkMessage = 0;
+        chainVertexId.reset(1);
+        neighberNode.reset();
+        message = Message.NON;
+    }
+
+    public PositionWritable getSourceVertexId() {
+        return sourceVertexId;
+    }
+
+    public void setSourceVertexId(PositionWritable sourceVertexId) {
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+        }
+    }
+    
+    public KmerBytesWritable getChainVertexId() {
+        return chainVertexId;
+    }
+
+    public void setChainVertexId(KmerBytesWritable chainVertexId) {
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(chainVertexId);
+        }
+    }
+    
+    public AdjacencyListWritable getNeighberNode() {
+        return neighberNode;
+    }
+
+    public void setNeighberNode(AdjacencyListWritable neighberNode) {
+        if(neighberNode != null){
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(neighberNode);
+        }
+    }
+    
+    public int getLengthOfChain() {
+        return chainVertexId.getKmerLength();
+    }
+
+    public PositionWritable getStartVertexId() {
+        return startVertexId;
+    }
+
+    public void setStartVertexId(PositionWritable startVertexId) {
+        if(startVertexId != null){
+            checkMessage |= CheckMessage.START;
+            this.startVertexId.set(startVertexId);
+        }
+    }
+
+    public byte getMessage() {
+        return message;
+    }
+
+    public void setMessage(byte message) {
+        this.message = message;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeByte(checkMessage);
+        if ((checkMessage & CheckMessage.SOURCE) != 0)
+            sourceVertexId.write(out);
+        if ((checkMessage & CheckMessage.CHAIN) != 0)
+            chainVertexId.write(out);
+        if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+            neighberNode.write(out);
+        if ((checkMessage & CheckMessage.START) != 0)
+            startVertexId.write(out);
+        out.write(message);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.reset();
+        checkMessage = in.readByte();
+        if ((checkMessage & CheckMessage.SOURCE) != 0)
+            sourceVertexId.readFields(in);
+        if ((checkMessage & CheckMessage.CHAIN) != 0)
+            chainVertexId.readFields(in);
+        if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+            neighberNode.readFields(in);
+        if ((checkMessage & CheckMessage.START) != 0)
+            startVertexId.readFields(in);
+        message = in.readByte();
+    }
+
+    @Override
+    public int hashCode() {
+        return sourceVertexId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof MergeBubbleMessageWritable) {
+            MergeBubbleMessageWritable tp = (MergeBubbleMessageWritable) o;
+            return sourceVertexId.equals(tp.sourceVertexId);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return sourceVertexId.toString();
+    }
+
+    public int compareTo(MergeBubbleMessageWritable tp) {
+        return sourceVertexId.compareTo(tp.sourceVertexId);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index f0a6b58..6a71344 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -31,6 +31,23 @@
         message = Message.NON;
         checkMessage = (byte) 0;
     }
+    
+    public void set(MessageWritable msg) {
+        checkMessage = 0;
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(msg.getSourceVertexId());
+        }
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(msg.getChainVertexId());
+        }
+        if (neighberNode != null) {
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(msg.getNeighberNode());
+        }
+        this.message = msg.getMessage();
+    }
 
     public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
         checkMessage = 0;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 9a46a67..8716d8d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.genomix.pregelix.operator.bridgeremove;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import org.apache.hadoop.io.NullWritable;
 
@@ -11,6 +12,8 @@
 import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
 
 /*
  * vertexId: BytesWritable
@@ -46,31 +49,137 @@
 public class BridgeRemoveVertex extends
         Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
     public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
-    public static final String ITERATIONS = "BridgeRemoveVertex.iteration";
+    public static final String LENGTH = "BridgeRemoveVertex.length";
     public static int kmerSize = -1;
-    private int maxIteration = -1;
+    private int length = -1;
 
     private MessageWritable incomingMsg = new MessageWritable();
     private MessageWritable outgoingMsg = new MessageWritable();
-
+    private ArrayList<MessageWritable> receivedMsg = new ArrayList<MessageWritable>();
     
+    private PositionWritable destVertexId = new PositionWritable();
+    private Iterator<PositionWritable> posIterator;
     /**
      * initiate kmerSize, maxIteration
      */
     public void initVertex() {
         if (kmerSize == -1)
             kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
-        if (maxIteration < 0)
-            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+        if(length == -1)
+            length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
         outgoingMsg.reset();
+        receivedMsg.clear();
+    }
+    
+    /**
+     * head send message to all next nodes
+     */
+    public void sendMsgToAllNextNodes(ValueStateWritable value) {
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMFF);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMFR);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+    }
+
+    /**
+     * head send message to all previous nodes
+     */
+    public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+        posIterator = value.getRFList().iterator(); // RFList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMRF);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getRRList().iterator(); // RRList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMRR);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
     }
 
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
         initVertex();
         if (getSuperstep() == 1) {
-          
-        } 
+            if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+                sendMsgToAllNextNodes(getVertexValue());
+            }
+           else if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+                sendMsgToAllPreviousNodes(getVertexValue());
+           }
+        }
+        else if (getSuperstep() == 2){
+            int i = 0;
+            while (msgIterator.hasNext()) {
+                if(i == 3)
+                    break;
+                receivedMsg.add(msgIterator.next());
+                i++;
+            }
+            if(receivedMsg.size() == 2){
+                if(getVertexValue().getLengthOfMergeChain() > length){
+                    outgoingMsg.setSourceVertexId(getVertexId());
+                    if(receivedMsg.get(0).getMessage() == AdjMessage.FROMFF 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR){
+                        outgoingMsg.setMessage(AdjMessage.FROMRR);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFF);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFF 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+                        outgoingMsg.setMessage(AdjMessage.FROMRR);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFR);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR) {
+                        outgoingMsg.setMessage(AdjMessage.FROMRF);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFF);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+                        outgoingMsg.setMessage(AdjMessage.FROMRF);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFR);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    }
+                }
+            }
+        }
+        else if(getSuperstep() == 3){
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+                    //remove incomingMsg.getSourceId from RR positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+                  //remove incomingMsg.getSourceId from RF positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+                  //remove incomingMsg.getSourceId from FR positionList
+                } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+                  //remove incomingMsg.getSourceId from FF positionList
+                }
+            }
+        }
+        voteToHalt();
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index a9c9c5b..e16cd20 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -1,6 +1,10 @@
 package edu.uci.ics.genomix.pregelix.operator.bubblemerge;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
+
 import org.apache.hadoop.io.NullWritable;
 
 import edu.uci.ics.genomix.type.PositionWritable;
@@ -9,8 +13,10 @@
 import edu.uci.ics.genomix.pregelix.client.Client;
 import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
 import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
 
 /*
  * vertexId: BytesWritable
@@ -44,15 +50,19 @@
  * Naive Algorithm for path merge graph
  */
 public class BubbleMergeVertex extends
-        Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+        Vertex<PositionWritable, ValueStateWritable, NullWritable, MergeBubbleMessageWritable> {
     public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
     public static final String ITERATIONS = "BubbleMergeVertex.iteration";
     public static int kmerSize = -1;
     private int maxIteration = -1;
 
-    private MessageWritable incomingMsg = new MessageWritable();
-    private MessageWritable outgoingMsg = new MessageWritable();
+    private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
+    private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
 
+    private PositionWritable destVertexId = new PositionWritable();
+    private Iterator<PositionWritable> posIterator;
+    private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsg = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
+    private ArrayList<MergeBubbleMessageWritable> tmpMsg = new ArrayList<MergeBubbleMessageWritable>();
     
     /**
      * initiate kmerSize, maxIteration
@@ -64,13 +74,121 @@
             maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
         outgoingMsg.reset();
     }
+    /**
+     * get destination vertex
+     */
+    public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+        if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+            posIterator = value.getFFList().iterator();
+        else // #FRList() > 0
+            posIterator = value.getFRList().iterator();
+        return posIterator.next();
+    }
 
+    /**
+     * head send message to all next nodes
+     */
+    public void sendMsgToAllNextNodes(ValueStateWritable value) {
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+    }
+    
     @Override
-    public void compute(Iterator<MessageWritable> msgIterator) {
+    public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
         initVertex();
         if (getSuperstep() == 1) {
-          
-        } 
+            if(VertexUtil.isHeadVertex(getVertexValue())){
+                outgoingMsg.setSourceVertexId(getVertexId());
+                sendMsgToAllNextNodes(getVertexValue());
+            }
+        } else if (getSuperstep() == 2){
+            while (msgIterator.hasNext()) {
+                if(VertexUtil.isPathVertex(getVertexValue())){
+                    outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+                    outgoingMsg.setSourceVertexId(getVertexId());
+                    outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+                    destVertexId.set(getNextDestVertexId(getVertexValue()));
+                    sendMsg(destVertexId, outgoingMsg);
+                }
+            }
+        } else if (getSuperstep() == 3){
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(!receivedMsg.containsKey(incomingMsg.getStartVertexId())){
+                    tmpMsg.clear();
+                    tmpMsg.add(incomingMsg);
+                    receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+                }
+                else{
+                    tmpMsg.clear();
+                    tmpMsg.addAll(receivedMsg.get(incomingMsg.getStartVertexId()));
+                    tmpMsg.add(incomingMsg);
+                    receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+                }
+            }
+            for(PositionWritable prevId : receivedMsg.keySet()){
+                tmpMsg = receivedMsg.get(prevId);
+                if(tmpMsg.size() > 1){
+                    //find the node with largest length of mergeChain
+                    boolean flag = true; //the same length
+                    int maxLength = tmpMsg.get(0).getLengthOfChain();
+                    PositionWritable max = tmpMsg.get(0).getSourceVertexId();
+                    for(int i = 1; i < tmpMsg.size(); i++){
+                        if(tmpMsg.get(i).getLengthOfChain() != maxLength)
+                            flag = false;
+                        if(tmpMsg.get(i).getLengthOfChain() > maxLength){
+                            maxLength = tmpMsg.get(i).getLengthOfChain();
+                            max = tmpMsg.get(i).getSourceVertexId();
+                        }
+                    }
+                    //send merge or unchange Message to node with largest length
+                    if(flag == true){
+                        //send unchange Message to node with largest length
+                        //we can send no message to complete this step
+                        //send delete Message to node which doesn't have largest length
+                        for(int i = 0; i < tmpMsg.size(); i++){
+                            if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+                                outgoingMsg.setMessage(AdjMessage.KILL);
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            } else {
+                                outgoingMsg.setMessage(AdjMessage.UNCHANGE);
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            }
+                        }
+                    } else{
+                        //send merge Message to node with largest length
+                        for(int i = 0; i < tmpMsg.size(); i++){
+                            if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+                                outgoingMsg.setMessage(AdjMessage.KILL);
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            } else {
+                                outgoingMsg.setMessage(AdjMessage.MERGE);
+                                /* add other node in message */
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            }
+                        }
+                    }
+                }
+            }
+        } else if (getSuperstep() == 4){
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(incomingMsg.getMessage() == AdjMessage.KILL){
+                    deleteVertex(getVertexId());
+                } else if (incomingMsg.getMessage() == AdjMessage.MERGE){
+                    //merge with small node
+                }
+            }
+        }
+        voteToHalt();
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 534da5c..4a174ec 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -11,7 +11,7 @@
 import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
 import edu.uci.ics.genomix.pregelix.util.VertexUtil;
 
 /*
@@ -68,26 +68,46 @@
 
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
-        initVertex(); //getVertexValue().getLengthOfMergeChain() < length
+        initVertex(); 
         if(getSuperstep() == 1){
             if(VertexUtil.isIncomingTipVertex(getVertexValue())){
             	if(getVertexValue().getLengthOfMergeChain() > length){
-            		if(getVertexValue().getOutgoingList().getCountOfPosition() != 0){
-	            		if(getVertexValue().getFFList().getCountOfPosition() > 0)
-	            			outgoingMsg.setMessage(Message.TOFORWARD);
-	            		else if(getVertexValue().getFRList().getCountOfPosition() > 0)
-	            			outgoingMsg.setMessage(Message.TOREVERSE);
-	            		outgoingMsg.setSourceVertexId(getVertexId());
-            		}
+            		if(getVertexValue().getFFList().getCountOfPosition() > 0)
+            			outgoingMsg.setMessage(AdjMessage.FROMFF);
+            		else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+            			outgoingMsg.setMessage(AdjMessage.FROMFR);
+            		outgoingMsg.setSourceVertexId(getVertexId());
+            		deleteVertex(getVertexId());
             	}
-            	
             }
             else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
-            	
+                if(getVertexValue().getLengthOfMergeChain() > length){
+                    if(getVertexValue().getRFList().getCountOfPosition() > 0)
+                        outgoingMsg.setMessage(AdjMessage.FROMRF);
+                    else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+                        outgoingMsg.setMessage(AdjMessage.FROMRR);
+                    outgoingMsg.setSourceVertexId(getVertexId());
+                    deleteVertex(getVertexId());
+                }
+            }
+            else if(VertexUtil.isSingleVertex(getVertexValue())){
+                if(getVertexValue().getLengthOfMergeChain() > length)
+                    deleteVertex(getVertexId());
             }
         }
         else if(getSuperstep() == 2){
-        	
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+                    //remove incomingMsg.getSourceId from RR positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+                  //remove incomingMsg.getSourceId from RF positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+                  //remove incomingMsg.getSourceId from FR positionList
+                } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+                  //remove incomingMsg.getSourceId from FF positionList
+                }
+            }
         }
         voteToHalt();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
new file mode 100644
index 0000000..ca8d795
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.pregelix.type;
+
+public class AdjMessage {
+    public static final byte FROMFF = 0;
+    public static final byte FROMFR = 1;
+    public static final byte FROMRF = 2;
+    public static final byte FROMRR = 3;
+    public static final byte NON = 4;
+    public static final byte UNCHANGE = 5;
+    public static final byte MERGE = 6;
+    public static final byte KILL = 7;
+    
+    public final static class ADJMESSAGE_CONTENT {
+        public static String getContentFromCode(byte code) {
+            String r = "";
+            switch (code) {
+                case FROMFF:
+                    r = "FROMFF";
+                    break;
+                case FROMFR:
+                    r = "FROMFR";
+                    break;
+                case FROMRF:
+                    r = "FROMRF";
+                    break;
+                case FROMRR:
+                    r = "FROMRR";
+                    break;
+                case NON:
+                    r = "NON";
+                    break;
+                case UNCHANGE:
+                    r = "UNCHANGE";
+                    break;
+                case MERGE:
+                    r = "MERGE";
+                    break;
+                case KILL:
+                    r = "KILL";
+                    break;
+            }
+            return r;
+        }
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 6e6a97a..c7bcf48 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -8,6 +8,7 @@
     public static final byte MESSAGE = 1 << 3;
     public static final byte STATE = 1 << 4;
     public static final byte LASTGENECODE = 1 << 5;
+    public static final byte START = 1 << 6;
 
     public final static class CheckMessage_CONTENT {
 
@@ -32,6 +33,9 @@
                 case LASTGENECODE:
                     r = "LASTGENECODE";
                     break;
+                case START:
+                    r = "START";
+                    break;
             }
             return r;
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index b97f0bb..4644383 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -8,8 +8,8 @@
     public static final byte STOP = 3;
     public static final byte FROMPSEUDOHEAD = 4;
     public static final byte FROMPSEUDOREAR = 5;
-    public static final byte TOFORWARD = 6;
-    public static final byte TOREVERSE = 7;
+    public static final byte IN = 6;
+    public static final byte OUT = 7;
 
     public final static class MESSAGE_CONTENT {
 
@@ -34,11 +34,11 @@
                 case FROMPSEUDOREAR:
                     r = "FROMPSEUDOREAR";
                     break;
-                case TOFORWARD:
-                    r = "TOFORWARD";
+                case IN:
+                    r = "IN";
                     break;
-                case TOREVERSE:
-                    r = "TOREVERSE";
+                case OUT:
+                    r = "OUT";
                     break;
             }
             return r;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 19839c7..772690d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -58,4 +58,25 @@
     public static boolean isOutgoingTipVertex(ValueStateWritable value){
     	return value.inDegree() == 1 && value.outDegree() == 0;
     }
+    
+    /**
+     * check if vertex is single
+     */
+    public static boolean isSingleVertex(ValueStateWritable value){
+        return value.inDegree() == 0 && value.outDegree() == 0;
+    }
+    
+    /**
+     * check if vertex is upbridge
+     */
+    public static boolean isUpBridgeVertex(ValueStateWritable value){
+        return value.inDegree() == 1 && value.outDegree() > 1;
+    }
+    
+    /**
+     * check if vertex is downbridge
+     */
+    public static boolean isDownBridgeVertex(ValueStateWritable value){
+        return value.inDegree() > 1 && value.outDegree() == 1;
+    }
 }