add stop condition for p2
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 61bb08f..8e234e9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -248,6 +248,8 @@
newState |= MessageFlag.IS_OLDHEAD;
getVertexValue().setState(newState);
outFlag |= MessageFlag.IS_HEAD;
+ } else if((getVertexValue().getState() & MessageFlag.IS_OLDHEAD) > 0){
+ outFlag |= MessageFlag.IS_OLDHEAD;
}
byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
@@ -261,7 +263,7 @@
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setChainVertexId(getVertexValue().getKmer());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
case MessageFlag.DIR_RR:
@@ -272,7 +274,7 @@
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setChainVertexId(getVertexValue().getKmer());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
}
@@ -414,6 +416,25 @@
kmerSize, incomingMsg.getChainVertexId());
}
+ /**
+ * merge and updateAdjList having parameter
+ */
+ public void processMerge(MessageWritable msg){
+ byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
+ kmerSize, msg.getChainVertexId());
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 1f354b8..4f1cefb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -1,18 +1,16 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+import java.util.ArrayList;
import java.util.Iterator;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -51,8 +49,10 @@
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
-
+ private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
+
byte headFlag;
+ byte oldHeadFlag;
/**
* initiate kmerSize, maxIteration
*/
@@ -81,6 +81,32 @@
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, outgoingMsg);
}
+
+ /**
+ * check received message
+ */
+ public byte checkNumOfMsgsFromHead(){
+ int countHead = 0;
+ int countOldHead = 0;
+ for(int i = 0; i < receivedMsgList.size(); i++){
+ if((byte)(receivedMsgList.get(i).getFlag() | MessageFlag.IS_HEAD) > 0)
+ countHead++;
+ if((byte)(receivedMsgList.get(i).getFlag() | MessageFlag.IS_OLDHEAD) > 0)
+ countOldHead++;
+ }
+ if(countHead == 0 && countOldHead == 0)
+ return MessageFromHead.BothMsgsFromNonHead;
+ else if(countHead == 2)
+ return MessageFromHead.BothMsgsFromHead;
+ else if(countOldHead == 2)
+ return MessageFromHead.BothMsgsFromOldHead;
+ else if(countHead == 1)
+ return MessageFromHead.OneMsgFromHead;
+ else if(countOldHead == 1)
+ return MessageFromHead.OneMsgFromNonHead;
+
+ return MessageFromHead.NO_INFO;
+ }
/**
* head send message to path
@@ -89,14 +115,40 @@
//process merge when receiving msg
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- processMerge();
- headFlag = (byte)(incomingMsg.getFlag() | MessageFlag.IS_HEAD);
- if(headFlag > 0)
- getVertexValue().setState(MessageFlag.IS_HEAD);
+ receivedMsgList.add(incomingMsg);
}
+ if(receivedMsgList.size() != 0){
+ byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
+ switch(numOfMsgsFromHead){
+ case MessageFromHead.BothMsgsFromNonHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ break;
+ case MessageFromHead.BothMsgsFromHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ break;
+ case MessageFromHead.BothMsgsFromOldHead:
+ deleteVertex(getVertexId());
+ break;
+ case MessageFromHead.OneMsgFromHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_HEAD);
+ break;
+ case MessageFromHead.OneMsgFromNonHead:
+ //halt
+ voteToHalt();
+ break;
+ }
+ } else
+ voteToHalt();
//send out wantToMerge msg
headFlag = (byte)(getVertexValue().getState() | MessageFlag.IS_HEAD);
- if(headFlag == 0)
+ oldHeadFlag = (byte)(getVertexValue().getState() | MessageFlag.IS_OLDHEAD);
+ outFlag = (byte)(headFlag | oldHeadFlag);
+ if(outFlag == 0)
sendOutMsg();
}
@@ -134,7 +186,7 @@
initState(msgIterator);
else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
sendMsgToPathVertex(msgIterator);
- voteToHalt();
+ //voteToHalt();
} else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
voteToHalt();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index 83614e7..c130ec6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -6,8 +6,9 @@
public static final byte FLIP = 1 << 3;
public static final byte IS_HEAD = 1 << 4;
- public static final byte IS_OLDHEAD = 1 << 5;
public static final byte IS_TAIL = 1 << 5;
+ public static final byte IS_OLDHEAD = 1 << 6;
+ public static final byte IS_FINAL = 0b000011;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
new file mode 100644
index 0000000..e7a0ff1
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.genomix.pregelix.type;
+
+public class MessageFromHead {
+ public static final byte BothMsgsFromHead = 1 << 0;
+ public static final byte BothMsgsFromNonHead = 2 << 0;
+ public static final byte BothMsgsFromOldHead = 3 << 0;
+ public static final byte OneMsgFromHead = 4 << 1;
+ public static final byte OneMsgFromNonHead = 5 << 1;
+
+ public static final byte NO_INFO = 0 << 0;
+}