Half done BFSTraverseVertex Framework
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 65373c2..cd2425b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -12,6 +12,7 @@
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.pregelix.api.io.WritableSizable;
public class MessageWritable implements WritableComparable<MessageWritable>, WritableSizable {
@@ -26,10 +27,11 @@
private PositionListWritable nodeIdList = new PositionListWritable();
private float averageCoverage;
private byte flag;
- private boolean isFlip;
+ private boolean isFlip; // also use for odd or even
private int kmerlength = 0;
private boolean updateMsg = false;
private VKmerBytesWritable startVertexId;
+ private VKmerListWritable pathList;
private byte checkMessage;
@@ -42,6 +44,7 @@
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
+ pathList = new VKmerListWritable();
}
public MessageWritable(int kmerSize) {
@@ -77,7 +80,6 @@
checkMessage |= CheckMessage.START;
this.startVertexId.setAsCopy(msg.getStartVertexId());
}
- checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
updateMsg = msg.isUpdateMsg();
}
@@ -135,7 +137,18 @@
if (actualKmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
this.actualKmer.setAsCopy(actualKmer);
+ }
+ }
+
+ // use for scaffolding
+ public VKmerBytesWritable getMiddleVertexId() {
+ return actualKmer;
+ }
+ public void setMiddleVertexId(VKmerBytesWritable middleKmer) {
+ if (middleKmer != null) {
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.actualKmer.setAsCopy(middleKmer);
}
}
@@ -147,7 +160,6 @@
if (actualKmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
this.actualKmer.setAsCopy(actualKmer);
-
}
}
@@ -212,6 +224,14 @@
public void setFlip(boolean isFlip) {
this.isFlip = isFlip;
}
+
+ public boolean isEven() {
+ return isFlip;
+ }
+
+ public void setEven(boolean isEven) {
+ this.isFlip = isEven;
+ }
public boolean isUpdateMsg() {
@@ -233,6 +253,17 @@
}
}
+ public VKmerListWritable getPathList() {
+ return pathList;
+ }
+
+ public void setPathList(VKmerListWritable pathList) {
+ if(pathList != null){
+ checkMessage |= CheckMessage.PATHLIST;
+ this.pathList.setCopy(pathList);
+ }
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(kmerlength);
@@ -247,6 +278,8 @@
nodeIdList.write(out);
if ((checkMessage & CheckMessage.START) != 0)
startVertexId.write(out);
+ if ((checkMessage & CheckMessage.PATHLIST) != 0)
+ pathList.write(out);
out.writeFloat(averageCoverage);
out.writeBoolean(isFlip);
out.writeByte(flag);
@@ -268,6 +301,8 @@
nodeIdList.readFields(in);
if ((checkMessage & CheckMessage.START) != 0)
startVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.PATHLIST) != 0)
+ pathList.readFields(in);
averageCoverage = in.readFloat();
isFlip = in.readBoolean();
flag = in.readByte();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 3036c2e..7c07b33 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -48,6 +48,7 @@
private byte state;
private boolean isFakeVertex = false;
+
private HashMapWritable<VKmerBytesWritable, VKmerListWritable> traverseMap = new HashMapWritable<VKmerBytesWritable, VKmerListWritable>();
public VertexValueWritable() {
@@ -142,9 +143,6 @@
this.traverseMap.clear();
}
-// public void reset(int kmerSize) {
-// }
-
@Override
public void readFields(DataInput in) throws IOException {
reset();
@@ -174,7 +172,7 @@
return inDegree() + outDegree();
}
- /*
+ /**
* Delete the corresponding edge
*/
public void processDelete(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete){
@@ -182,7 +180,7 @@
this.getEdgeList(dir).remove(nodeToDelete);
}
- /*
+ /**
* Process any changes to value. This is for edge updates
*/
public void processUpdates(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
@@ -194,7 +192,7 @@
this.getEdgeList(mergeDir).append(nodeToAdd);
}
- /*
+ /**
* Process any changes to value. This is for merging
*/
public void processMerges(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
@@ -210,4 +208,8 @@
}
}
+ public boolean hasPathTo(VKmerBytesWritable nodeToSeek){
+ return traverseMap.containsKey(nodeToSeek);
+ }
+
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/BFSTraverseVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/BFSTraverseVertex.java
new file mode 100644
index 0000000..f3abaed
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/BFSTraverseVertex.java
@@ -0,0 +1,206 @@
+package edu.uci.ics.genomix.pregelix.operator.scaffolding;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
+
+public class BFSTraverseVertex extends
+ MapReduceVertex {
+
+ public class pathId{
+ long readId;
+ VKmerBytesWritable middleVertexId;
+
+ public pathId(){
+ readId = 0;
+ middleVertexId = new VKmerBytesWritable();
+ }
+
+ public void set(long readId, VKmerBytesWritable middleVertexId){
+ this.readId = readId;
+ this.middleVertexId.setAsCopy(middleVertexId);
+ }
+
+ public long getReadId() {
+ return readId;
+ }
+
+ public void setReadId(long readId) {
+ this.readId = readId;
+ }
+
+ public VKmerBytesWritable getMiddleVertexId() {
+ return middleVertexId;
+ }
+
+ public void setMiddleVertexId(VKmerBytesWritable middleVertexId) {
+ this.middleVertexId = middleVertexId;
+ }
+
+ }
+
+ private VKmerBytesWritable srcNode = new VKmerBytesWritable();
+ private VKmerBytesWritable destNode = new VKmerBytesWritable();
+ private List<MessageWritable> msgList = new ArrayList<MessageWritable>();
+ Map<Long, List<MessageWritable>> receivedMsg = new HashMap<Long, List<MessageWritable>>();
+
+ private boolean isFakeVertex = false;
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(reverseKmer == null)
+ reverseKmer = new VKmerBytesWritable();
+ if(kmerList == null)
+ kmerList = new VKmerListWritable();
+ else
+ kmerList.reset();
+ if(fakeVertex == null){
+ fakeVertex = new VKmerBytesWritable();
+ String random = generaterRandomString(kmerSize + 1);
+ fakeVertex.setByRead(kmerSize + 1, random.getBytes(), 0);
+ }
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable(kmerSize);
+ isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
+ }
+
+ public void aggregateMsgAndGroupedByReadIdInReachedNode(Iterator<MessageWritable> msgIterator){
+ receivedMsg.clear();
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ long readId = incomingMsg.getNodeIdList().getPosition(0).getReadId();
+ if(receivedMsg.containsKey(readId)){
+ msgList.addAll(receivedMsg.get(readId));
+ msgList.add(incomingMsg);
+ receivedMsg.put(readId, msgList);
+ } else{
+ msgList.clear();
+ msgList.add(incomingMsg);
+ receivedMsg.put(readId, msgList);
+ }
+ }
+ }
+
+ public void sendOddMsgToFakeNode(MessageWritable msg){
+ outgoingMsg.reset();
+ outgoingMsg.setSourceVertexId(msg.getSourceVertexId());
+ outgoingMsg.setSeekedVertexId(msg.getSeekedVertexId());
+ outgoingMsg.setPathList(msg.getPathList());
+ outgoingMsg.setNodeIdList(msg.getNodeIdList());
+ outgoingMsg.setMiddleVertexId(getVertexId());
+ outgoingMsg.setEven(false);
+ sendMsg(fakeVertex, outgoingMsg);
+ }
+
+ public void sendEvenMsgToFakeNode(MessageWritable msg){
+ outgoingMsg.reset();
+ outgoingMsg.setSourceVertexId(msg.getSourceVertexId());
+ outgoingMsg.setSeekedVertexId(msg.getSeekedVertexId());
+ outgoingMsg.setPathList(msg.getPathList());
+ outgoingMsg.setNodeIdList(msg.getNodeIdList());
+ outgoingMsg.setMiddleVertexId(getVertexId());
+ outgoingMsg.setEven(true);
+ sendMsg(fakeVertex, outgoingMsg);
+ }
+
+ public void initialBroadcaseBFSTraverse(){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setSeekedVertexId(incomingMsg.getSeekedVertexId());
+ outgoingMsg.getPathList().append(getVertexId());
+ outgoingMsg.setNodeIdList(incomingMsg.getNodeIdList()); //only one readId
+ outgoingMsg.setEven(true);
+ sendMsgToAllNeighborNodes(getVertexValue());
+ //add footprint
+ getVertexValue().getTraverseMap().put(getVertexId(), null);
+ }
+
+ public void broadcastBFSTraverse(){
+ outgoingMsg.setSourceVertexId(incomingMsg.getSourceVertexId());
+ outgoingMsg.setSeekedVertexId(incomingMsg.getSeekedVertexId());
+ outgoingMsg.getPathList().append(getVertexId());
+ outgoingMsg.setNodeIdList(incomingMsg.getNodeIdList()); //only one readId
+ outgoingMsg.setEven(true);
+ sendMsgToAllNeighborNodes(getVertexValue());
+ //add footprint
+ getVertexValue().getTraverseMap().put(getVertexId(), null);
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if(getSuperstep() == 1){
+ addFakeVertex();
+ voteToHalt();
+ }
+ else if(getSuperstep() == 2){
+ kmerList.append(new VKmerBytesWritable("Kmer1"));
+ kmerList.append(new VKmerBytesWritable("Kmer2"));
+ /** initiate two nodes -- srcNode and destNode **/
+ srcNode.setAsCopy(kmerList.getPosition(0));
+ destNode.setAsCopy(kmerList.getPosition(1));
+ // outgoingMsg.setNodeIdList(); set as common readId
+ outgoingMsg.setSeekedVertexId(destNode);
+ sendMsg(srcNode, outgoingMsg);
+ outgoingMsg.setSeekedVertexId(srcNode);
+ sendMsg(destNode, outgoingMsg);
+ } else if(getSuperstep() == 3){
+ if(!isFakeVertex){
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ /** begin to BFS **/
+ initialBroadcaseBFSTraverse();
+ }
+ voteToHalt();
+ }
+ } else if(getSuperstep() > 3 && getSuperstep() < 5){
+ if(!isFakeVertex){
+ /** aggregate message & grouped by readId**/
+ aggregateMsgAndGroupedByReadIdInReachedNode(msgIterator);
+ /** process receivedMsg **/
+ for(long readId : receivedMsg.keySet()){
+ msgList.clear();
+ msgList.addAll(receivedMsg.get(readId));
+ /** |msg| == 2, two msg meet in the same node **/
+ if(msgList.size() == 2){
+ /** Aggregate both msgs to Fake Node and mark flag as odd **/
+ sendOddMsgToFakeNode(msgList.get(0));
+ sendOddMsgToFakeNode(msgList.get(1));
+ //add footprint
+ getVertexValue().getTraverseMap().put(getVertexId(), null);
+ } else if(msgList.size() == 1){
+ if(getVertexValue().hasPathTo(incomingMsg.getSeekedVertexId())){
+ sendEvenMsgToFakeNode(msgList.get(0));
+ } else{
+ broadcastBFSTraverse();
+ }
+ }
+ }
+ voteToHalt();
+ } else{ //FakeVertex receives and processes Msg
+ /** aggregate message & grouped by readId and middleVertex **/
+ }
+ }
+
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 4b32a51..39282ab 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -5,7 +5,7 @@
public static final byte SOURCE = 1 << 0;
public static final byte ACUTUALKMER = 1 << 1;
public static final byte NEIGHBER = 1 << 2;
- public static final byte MESSAGE = 1 << 3;
+ public static final byte PATHLIST = 1 << 3;
public static final byte NODEIDLIST = 1 << 4;
public static final byte ADJMSG = 1 << 5;
public static final byte START = 1 << 6;
@@ -24,8 +24,8 @@
case NEIGHBER:
r = "NEIGHBER";
break;
- case MESSAGE:
- r = "MESSAGE";
+ case PATHLIST:
+ r = "PATHLIST";
break;
case NODEIDLIST:
r = "READID";