some updates
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 130d1cf..27518b4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -12,6 +12,7 @@
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
@@ -74,12 +75,15 @@
private byte headFlag;
private byte tailFlag;
private byte outFlag;
+ private byte outUpFlag;
private MessageWritable incomingMsg = new MessageWritable();
private MessageWritable outgoingMsg = new MessageWritable();
+ private MessageWritable outgoingUpMsg = new MessageWritable();
private PositionWritable destVertexId = new PositionWritable();
private Iterator<PositionWritable> posIterator;
+ private AdjacencyListWritable neighberNode = new AdjacencyListWritable();
/**
* initiate kmerSize, maxIteration
*/
@@ -249,6 +253,54 @@
//voteToHalt();
}
+ /**
+ * send update message from predecessor
+ */
+ public void sendUpMsgFromPredecessor(){
+ outFlag |= MessageFlag.FROM_PREDECESSOR;
+ if(getVertexValue().getFRList().getLength() > 0){
+ outFlag |= MessageFlag.FROM_REWARDLIST;
+ }
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setMessage(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ outUpFlag = (byte)(MessageFlag.FROM_DEADVERTEX | MessageFlag.FROM_SUCCESSOR);
+ if(getVertexValue().getRFList().getLength() > 0){
+ outUpFlag |= MessageFlag.FROM_REWARDLIST;
+ outgoingUpMsg.setNeighberNode(getVertexValue().getIncomingList());
+ } else
+ outgoingUpMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingUpMsg.setMessage(outUpFlag);
+ outgoingUpMsg.setSourceVertexId(getVertexId());
+
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingUpMsg);
+ deleteVertex(getVertexId());
+ }
+
+ /**
+ * send update message from successor
+ */
+ public void sendUpMsgFromSuccessor(){
+ outFlag |= MessageFlag.FROM_SUCCESSOR;
+ if(getVertexValue().getRFList().getLength() > 0){
+ outFlag |= MessageFlag.FROM_REWARDLIST;
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ } else
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setMessage(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ outUpFlag = (byte)(MessageFlag.FROM_DEADVERTEX | MessageFlag.FROM_PREDECESSOR);
+ if(getVertexValue().getFRList().getLength() > 0){
+ outUpFlag |= MessageFlag.FROM_REWARDLIST;
+ }
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingUpMsg.setMessage(outUpFlag);
+ outgoingUpMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingUpMsg);
+ deleteVertex(getVertexId());
+ }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -256,7 +308,7 @@
startSendMsg();
else if (getSuperstep() == 2)
initState(msgIterator);
- else{
+ else if (getSuperstep() % 2 == 1){
// Node may be marked as head b/c it's a real head or a real tail
headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
@@ -280,20 +332,10 @@
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- deleteVertex(getVertexId());
+ sendUpMsgFromPredecessor();
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
- outFlag |= MessageFlag.FROM_SUCCESSOR;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- deleteVertex(getVertexId());
+ sendUpMsgFromSuccessor();
}
} else {
// I'm a tail
@@ -301,39 +343,38 @@
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- deleteVertex(getVertexId());
+ sendUpMsgFromPredecessor();
}
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- deleteVertex(getVertexId());
+ sendUpMsgFromPredecessor();
}
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
- outFlag |= MessageFlag.FROM_SUCCESSOR;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- deleteVertex(getVertexId());
+ sendUpMsgFromSuccessor();
}
}
}
}
-
+ }
+ else if (getSuperstep() % 2 == 0){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ outFlag = incomingMsg.getMessage();
+ if((outFlag & MessageFlag.FROM_DEADVERTEX) > 0){
+ if((outFlag & MessageFlag.FROM_PREDECESSOR) > 0){
+ if((outFlag & MessageFlag.FROM_REWARDLIST) > 0){
+
+ } else {
+ getVertexValue().setIncomingList(incomingMsg.getNeighberNode());
+ }
+ }
+ }
+ }
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index e83598a..de349ad 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -1,28 +1,32 @@
package edu.uci.ics.genomix.pregelix.type;
public class MessageFlag {
- public static final byte EMPTY_MESSAGE = 0;
- public static final byte FROM_SELF = 1;
- public static final byte FROM_SUCCESSOR = 1 << 1;
- public static final byte FROM_PREDECESSOR = 1 << 2;
- public static final byte IS_HEAD = 1 << 3;
- public static final byte IS_TAIL = 1 << 4;
-
+ public static final byte FROM_SELF = 0;
+ public static final byte IS_HEAD = 1 << 1;
+ public static final byte IS_TAIL = 1 << 2;
+ public static final byte FROM_DEADVERTEX = 1 << 3;
+ //public static final byte FROM_FORWARDLIST = 1 << 4;
+ public static final byte FROM_REWARDLIST = 1 << 4;
+ public static final byte FROM_SUCCESSOR = 1 << 5;
+ public static final byte FROM_PREDECESSOR = 1 << 6;
+
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
switch (code) {
- case EMPTY_MESSAGE:
- return "EMPTY_MESSAGE";
case FROM_SELF:
return "FROM_SELF";
- case FROM_SUCCESSOR:
- return "FROM_SUCCESSOR";
- case FROM_PREDECESSOR:
- return "FROM_PREDECESSOR";
case IS_HEAD:
return "IS_HEAD";
case IS_TAIL:
return "IS_TAIL";
+ case FROM_DEADVERTEX:
+ return "FROM_DEADVERTEX";
+ case FROM_REWARDLIST:
+ return "FROM_REWARDLIST";
+ case FROM_SUCCESSOR:
+ return "FROM_SUCCESSOR";
+ case FROM_PREDECESSOR:
+ return "FROM_PREDECESSOR";
}
return "ERROR_BAD_MESSAGE";
}