Half done BFSTraverseVertex Framework
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 65373c2..cd2425b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -12,6 +12,7 @@
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
 import edu.uci.ics.pregelix.api.io.WritableSizable;
 
 public class MessageWritable implements WritableComparable<MessageWritable>, WritableSizable {
@@ -26,10 +27,11 @@
     private PositionListWritable nodeIdList = new PositionListWritable();
     private float averageCoverage;
     private byte flag;
-    private boolean isFlip;
+    private boolean isFlip; // also use for odd or even
     private int kmerlength = 0;
     private boolean updateMsg = false;
     private VKmerBytesWritable startVertexId;
+    private VKmerListWritable pathList;
     
     private byte checkMessage;
 
@@ -42,6 +44,7 @@
         flag = Message.NON;
         isFlip = false;
         checkMessage = (byte) 0;
+        pathList = new VKmerListWritable();
     }
     
     public MessageWritable(int kmerSize) {
@@ -77,7 +80,6 @@
             checkMessage |= CheckMessage.START;
             this.startVertexId.setAsCopy(msg.getStartVertexId());
         }
-        checkMessage |= CheckMessage.ADJMSG;
         this.flag = msg.getFlag();
         updateMsg = msg.isUpdateMsg();
     }
@@ -135,7 +137,18 @@
         if (actualKmer != null) {
             checkMessage |= CheckMessage.ACUTUALKMER;
             this.actualKmer.setAsCopy(actualKmer);
+        }
+    }
+    
+    // use for scaffolding
+    public VKmerBytesWritable getMiddleVertexId() {
+        return actualKmer;
+    }
 
+    public void setMiddleVertexId(VKmerBytesWritable middleKmer) {
+        if (middleKmer != null) {
+            checkMessage |= CheckMessage.ACUTUALKMER;
+            this.actualKmer.setAsCopy(middleKmer);
         }
     }
     
@@ -147,7 +160,6 @@
         if (actualKmer != null) {
             checkMessage |= CheckMessage.ACUTUALKMER;
             this.actualKmer.setAsCopy(actualKmer);
-
         }
     }
     
@@ -212,6 +224,14 @@
     public void setFlip(boolean isFlip) {
         this.isFlip = isFlip;
     }
+    
+    public boolean isEven() {
+        return isFlip;
+    }
+
+    public void setEven(boolean isEven) {
+        this.isFlip = isEven;
+    }
 
     
     public boolean isUpdateMsg() {
@@ -233,6 +253,17 @@
         }
     }
 
+    public VKmerListWritable getPathList() {
+        return pathList;
+    }
+
+    public void setPathList(VKmerListWritable pathList) {
+        if(pathList != null){
+            checkMessage |= CheckMessage.PATHLIST;
+            this.pathList.setCopy(pathList);
+        }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeInt(kmerlength);
@@ -247,6 +278,8 @@
             nodeIdList.write(out);
         if ((checkMessage & CheckMessage.START) != 0)
             startVertexId.write(out);
+        if ((checkMessage & CheckMessage.PATHLIST) != 0)
+            pathList.write(out);
         out.writeFloat(averageCoverage);
         out.writeBoolean(isFlip);
         out.writeByte(flag); 
@@ -268,6 +301,8 @@
             nodeIdList.readFields(in);
         if ((checkMessage & CheckMessage.START) != 0)
             startVertexId.readFields(in);
+        if ((checkMessage & CheckMessage.PATHLIST) != 0)
+            pathList.readFields(in);
         averageCoverage = in.readFloat();
         isFlip = in.readBoolean();
         flag = in.readByte();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 3036c2e..7c07b33 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -48,6 +48,7 @@
     
     private byte state;
     private boolean isFakeVertex = false;
+    
     private HashMapWritable<VKmerBytesWritable, VKmerListWritable> traverseMap = new HashMapWritable<VKmerBytesWritable, VKmerListWritable>();
 
     public VertexValueWritable() {
@@ -142,9 +143,6 @@
         this.traverseMap.clear();
     }
     
-//    public void reset(int kmerSize) {
-//    }
-    
     @Override
     public void readFields(DataInput in) throws IOException {
         reset();
@@ -174,7 +172,7 @@
         return inDegree() + outDegree();
     }
     
-    /*
+    /**
      * Delete the corresponding edge
      */
     public void processDelete(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete){
@@ -182,7 +180,7 @@
         this.getEdgeList(dir).remove(nodeToDelete);
     }
     
-    /*
+    /**
      * Process any changes to value.  This is for edge updates
      */
     public void processUpdates(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
@@ -194,7 +192,7 @@
         this.getEdgeList(mergeDir).append(nodeToAdd);
     }
     
-    /*
+    /**
      * Process any changes to value.  This is for merging
      */
     public void processMerges(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
@@ -210,4 +208,8 @@
         }
     }
     
+    public boolean hasPathTo(VKmerBytesWritable nodeToSeek){
+        return traverseMap.containsKey(nodeToSeek);
+    }
+    
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/BFSTraverseVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/BFSTraverseVertex.java
new file mode 100644
index 0000000..f3abaed
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/BFSTraverseVertex.java
@@ -0,0 +1,206 @@
+package edu.uci.ics.genomix.pregelix.operator.scaffolding;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
+
+public class BFSTraverseVertex extends
+    MapReduceVertex {
+    
+    public class pathId{
+        long readId;
+        VKmerBytesWritable middleVertexId;
+        
+        public pathId(){
+            readId = 0;
+            middleVertexId = new VKmerBytesWritable();
+        }
+        
+        public void set(long readId, VKmerBytesWritable middleVertexId){
+            this.readId = readId;
+            this.middleVertexId.setAsCopy(middleVertexId);
+        }
+
+        public long getReadId() {
+            return readId;
+        }
+
+        public void setReadId(long readId) {
+            this.readId = readId;
+        }
+
+        public VKmerBytesWritable getMiddleVertexId() {
+            return middleVertexId;
+        }
+
+        public void setMiddleVertexId(VKmerBytesWritable middleVertexId) {
+            this.middleVertexId = middleVertexId;
+        }
+        
+    }
+    
+    private VKmerBytesWritable srcNode = new VKmerBytesWritable();
+    private VKmerBytesWritable destNode = new VKmerBytesWritable();
+    private List<MessageWritable> msgList = new  ArrayList<MessageWritable>();
+    Map<Long, List<MessageWritable>> receivedMsg = new HashMap<Long, List<MessageWritable>>();
+    
+    private boolean isFakeVertex = false;
+    
+    /**
+     * initiate kmerSize, maxIteration
+     */
+    public void initVertex() {
+        if (kmerSize == -1)
+            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if (maxIteration < 0)
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+        if(incomingMsg == null)
+            incomingMsg = new MessageWritable(kmerSize);
+        if(outgoingMsg == null)
+            outgoingMsg = new MessageWritable(kmerSize);
+        else
+            outgoingMsg.reset(kmerSize);
+        if(reverseKmer == null)
+            reverseKmer = new VKmerBytesWritable();
+        if(kmerList == null)
+            kmerList = new VKmerListWritable();
+        else
+            kmerList.reset();
+        if(fakeVertex == null){
+            fakeVertex = new VKmerBytesWritable();
+            String random = generaterRandomString(kmerSize + 1);
+            fakeVertex.setByRead(kmerSize + 1, random.getBytes(), 0); 
+        }
+        if(destVertexId == null)
+            destVertexId = new VKmerBytesWritable(kmerSize);
+        isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
+    }
+    
+    public void aggregateMsgAndGroupedByReadIdInReachedNode(Iterator<MessageWritable> msgIterator){
+        receivedMsg.clear();
+        while(msgIterator.hasNext()){
+            incomingMsg = msgIterator.next();
+            long readId = incomingMsg.getNodeIdList().getPosition(0).getReadId();
+            if(receivedMsg.containsKey(readId)){
+                msgList.addAll(receivedMsg.get(readId));
+                msgList.add(incomingMsg);
+                receivedMsg.put(readId, msgList);
+            } else{
+                msgList.clear();
+                msgList.add(incomingMsg);
+                receivedMsg.put(readId, msgList);
+            }
+        }
+    }
+    
+    public void sendOddMsgToFakeNode(MessageWritable msg){
+        outgoingMsg.reset();
+        outgoingMsg.setSourceVertexId(msg.getSourceVertexId());
+        outgoingMsg.setSeekedVertexId(msg.getSeekedVertexId());
+        outgoingMsg.setPathList(msg.getPathList());
+        outgoingMsg.setNodeIdList(msg.getNodeIdList());
+        outgoingMsg.setMiddleVertexId(getVertexId());
+        outgoingMsg.setEven(false);
+        sendMsg(fakeVertex, outgoingMsg);
+    }
+    
+    public void sendEvenMsgToFakeNode(MessageWritable msg){
+        outgoingMsg.reset();
+        outgoingMsg.setSourceVertexId(msg.getSourceVertexId());
+        outgoingMsg.setSeekedVertexId(msg.getSeekedVertexId());
+        outgoingMsg.setPathList(msg.getPathList());
+        outgoingMsg.setNodeIdList(msg.getNodeIdList());
+        outgoingMsg.setMiddleVertexId(getVertexId());
+        outgoingMsg.setEven(true);
+        sendMsg(fakeVertex, outgoingMsg);
+    }
+    
+    public void initialBroadcaseBFSTraverse(){
+        outgoingMsg.setSourceVertexId(getVertexId());
+        outgoingMsg.setSeekedVertexId(incomingMsg.getSeekedVertexId());
+        outgoingMsg.getPathList().append(getVertexId());
+        outgoingMsg.setNodeIdList(incomingMsg.getNodeIdList()); //only one readId
+        outgoingMsg.setEven(true);
+        sendMsgToAllNeighborNodes(getVertexValue());
+        //add footprint
+        getVertexValue().getTraverseMap().put(getVertexId(), null);
+    }
+    
+    public void broadcastBFSTraverse(){
+        outgoingMsg.setSourceVertexId(incomingMsg.getSourceVertexId());
+        outgoingMsg.setSeekedVertexId(incomingMsg.getSeekedVertexId());
+        outgoingMsg.getPathList().append(getVertexId());
+        outgoingMsg.setNodeIdList(incomingMsg.getNodeIdList()); //only one readId
+        outgoingMsg.setEven(true);
+        sendMsgToAllNeighborNodes(getVertexValue());
+        //add footprint
+        getVertexValue().getTraverseMap().put(getVertexId(), null);
+    }
+    
+    @Override
+    public void compute(Iterator<MessageWritable> msgIterator) {
+        initVertex();
+        if(getSuperstep() == 1){
+            addFakeVertex();
+            voteToHalt();
+        }
+        else if(getSuperstep() == 2){
+            kmerList.append(new VKmerBytesWritable("Kmer1"));
+            kmerList.append(new VKmerBytesWritable("Kmer2"));
+            /** initiate two nodes -- srcNode and destNode **/
+            srcNode.setAsCopy(kmerList.getPosition(0));
+            destNode.setAsCopy(kmerList.getPosition(1));
+            // outgoingMsg.setNodeIdList(); set as common readId
+            outgoingMsg.setSeekedVertexId(destNode);
+            sendMsg(srcNode, outgoingMsg);
+            outgoingMsg.setSeekedVertexId(srcNode);
+            sendMsg(destNode, outgoingMsg);
+        } else if(getSuperstep() == 3){
+            if(!isFakeVertex){
+                if(msgIterator.hasNext()){
+                    incomingMsg = msgIterator.next();
+                    /** begin to BFS **/
+                    initialBroadcaseBFSTraverse();
+                }
+                voteToHalt();
+            }
+        } else if(getSuperstep() > 3 && getSuperstep() < 5){
+            if(!isFakeVertex){
+                /** aggregate message & grouped by readId**/
+                aggregateMsgAndGroupedByReadIdInReachedNode(msgIterator);
+                /** process receivedMsg  **/
+                for(long readId : receivedMsg.keySet()){
+                    msgList.clear();
+                    msgList.addAll(receivedMsg.get(readId));
+                    /** |msg| == 2, two msg meet in the same node **/
+                    if(msgList.size() == 2){
+                        /** Aggregate both msgs to Fake Node and mark flag as odd **/
+                        sendOddMsgToFakeNode(msgList.get(0));
+                        sendOddMsgToFakeNode(msgList.get(1));
+                        //add footprint
+                        getVertexValue().getTraverseMap().put(getVertexId(), null);
+                    } else if(msgList.size() == 1){
+                        if(getVertexValue().hasPathTo(incomingMsg.getSeekedVertexId())){
+                            sendEvenMsgToFakeNode(msgList.get(0));
+                        } else{
+                            broadcastBFSTraverse();
+                        }
+                    }
+                }
+                voteToHalt();
+            } else{ //FakeVertex receives and processes Msg
+                /** aggregate message & grouped by readId and middleVertex **/
+            }
+        }
+        
+    }
+    
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 4b32a51..39282ab 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -5,7 +5,7 @@
     public static final byte SOURCE = 1 << 0;
     public static final byte ACUTUALKMER = 1 << 1;
     public static final byte NEIGHBER = 1 << 2;
-    public static final byte MESSAGE = 1 << 3;
+    public static final byte PATHLIST = 1 << 3;
     public static final byte NODEIDLIST = 1 << 4;
     public static final byte ADJMSG = 1 << 5;
     public static final byte START = 1 << 6;
@@ -24,8 +24,8 @@
                 case NEIGHBER:
                     r = "NEIGHBER";
                     break;
-                case MESSAGE:
-                    r = "MESSAGE";
+                case PATHLIST:
+                    r = "PATHLIST";
                     break;
                 case NODEIDLIST:
                     r = "READID";