fixing P1
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 4cc4527..8b05216 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -96,6 +96,10 @@
return (byte)(getVertexValue().getState() & State.IS_HEAD);
}
+ public boolean isHeadNode(){
+ return getHeadFlag() > 0;
+ }
+
/**
* reset selfFlag
*/
@@ -190,10 +194,6 @@
return killFlag == MessageFlag.KILL & deadFlag == MessageFlag.DIR_FROM_DEADVERTEX;
}
- public boolean isHeadNode(){
- return selfFlag == State.IS_HEAD;
- }
-
public boolean isPathNode(){
return selfFlag != State.IS_HEAD && selfFlag != State.IS_OLDHEAD;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 06f7734..0b010c9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -9,7 +9,6 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.io.message.PathMergeMessageWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
/*
* vertexId: BytesWritable
@@ -45,7 +44,7 @@
public class P1ForPathMergeVertex extends
BasicPathMergeVertex<VertexValueWritable, PathMergeMessageWritable> {
- private ArrayList<PathMergeMessageWritable> receivedMsgList = new ArrayList<PathMergeMessageWritable>();
+ private ArrayList<PathMergeMessageWritable> receivedMsg = new ArrayList<PathMergeMessageWritable>();
/**
* initiate kmerSize, maxIteration
*/
@@ -61,7 +60,10 @@
destVertexId = new VKmerBytesWritable();
inFlag = 0;
outFlag = 0;
- receivedMsgList.clear();
+ headMergeDir = getHeadMergeDir();
+ if(repeatKmer == null)
+ repeatKmer = new VKmerBytesWritable();
+ tmpValue.reset();
}
public void chooseDirAndSendMsg(){
@@ -114,25 +116,40 @@
voteToHalt();
} else if (getSuperstep() == 2)
initState(msgIterator);
- else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
- sendMsgToPathVertex(msgIterator);
- voteToHalt();
- } else if (getSuperstep() % 2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration) {
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- receivedMsgList.add(incomingMsg);
- }
- /** if receive two messages, break the symmetric **/
- if(receivedMsgList.size() > 0){
- if(VertexUtil.isPathVertex(getVertexValue()) || canMergeWithHead(receivedMsgList.get(0))){
- /** choose update and merge direction **/
- sendUpdateMsg(receivedMsgList.get(0));
- outFlag = 0;
- sendMergeMsgForP1(receivedMsgList.get(0));
- deleteVertex(getVertexId());
+ else if (getSuperstep() % 4 == 3 && getSuperstep() <= maxIteration) {
+ if(isHeadNode()){
+ byte headMergeDir = (byte)(getVertexValue().getState() & State.HEAD_SHOULD_MERGE_MASK);
+ switch(headMergeDir){
+ case State.HEAD_SHOULD_MERGEWITHPREV:
+ sendUpdateMsgToSuccessor(true);
+ break;
+ case State.HEAD_SHOULD_MERGEWITHNEXT:
+ sendUpdateMsgToPredecessor(true);
+ break;
}
+ } else
+ voteToHalt();
+ } else if (getSuperstep() % 4 == 0 && getSuperstep() <= maxIteration) {
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ processUpdate();
+ if(isHaltNode())
+ voteToHalt();
+ else
+ activate();
}
- voteToHalt();
+ } else if (getSuperstep() % 4 == 1 && getSuperstep() <= maxIteration) {
+ if(isHeadNode())
+ broadcastMergeMsg();
+ } else if (getSuperstep() % 4 == 2 && getSuperstep() <= maxIteration) {
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ receivedMsg.add(incomingMsg);
+ }
+ if(receivedMsg.size() == 2){
+ for(int i = 0; i < 2; i++)
+ processMerge();
+ }
} else
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 8354b27..63ae348 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -181,7 +181,7 @@
if(isHaltNode())
voteToHalt();
else
- this.activate();
+ activate();
}
} else if (getSuperstep() % 4 == 1){
//send message to the merge object and kill self
@@ -214,7 +214,7 @@
}
else{
getVertexValue().setCounters(counters);
- this.activate();
+ activate();
}
}
}