clear the code and fix messageFlag
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 6870ff1..d1fcd6b 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -31,31 +31,32 @@
// merge/update directions
public static class DirectionFlag {
- public static final byte DIR_FF = 0b000 << 0;
- public static final byte DIR_FR = 0b001 << 0;
- public static final byte DIR_RF = 0b010 << 0;
- public static final byte DIR_RR = 0b011 << 0;
- public static final byte DIR_NO = 0b111 << 0;
+ public static final byte DIR_NO = 0b000 << 0;
+ public static final byte DIR_FF = 0b001 << 0;
+ public static final byte DIR_FR = 0b010 << 0;
+ public static final byte DIR_RF = 0b011 << 0;
+ public static final byte DIR_RR = 0b100 << 0;
public static final byte DIR_MASK = 0b111 << 0;
}
public static class SpecialVertexFlag extends DirectionFlag {
- public static final byte IS_RANDOMTAIL = 0b00 << 4;
- public static final byte IS_STOP = 0b00 << 4;
- public static final byte IS_HEAD = 0b01 << 4;
- public static final byte IS_FINAL = 0b10 << 4;
- public static final byte IS_RANDOMHEAD = 0b11 << 4;
- public static final byte IS_OLDHEAD = 0b11 << 4;
+ public static final byte IS_NON = 0b00 << 5;
+ public static final byte IS_RANDOMTAIL = 0b00 << 5;
+ public static final byte IS_STOP = 0b00 << 5;
+ public static final byte IS_HEAD = 0b01 << 5;
+ public static final byte IS_FINAL = 0b10 << 5;
+ public static final byte IS_RANDOMHEAD = 0b11 << 5;
+ public static final byte IS_OLDHEAD = 0b11 << 5;
- public static final byte VERTEX_MASK = 0b11 << 4;
+ public static final byte VERTEX_MASK = 0b11 << 5;
public static final byte VERTEX_CLEAR = (byte) 11001111;
}
public static class MergeDirFlag extends SpecialVertexFlag{
- public static final byte NO_MERGE = 0b00 << 2;
- public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 2;
- public static final byte SHOULD_MERGEWITHPREV = 0b10 << 2;
- public static final byte SHOULD_MERGE_MASK = 0b11 << 2;
+ public static final byte NO_MERGE = 0b00 << 3;
+ public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
+ public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
+ public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
}
private PositionWritable nodeID;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 979f5f2..9b3a886 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -21,6 +21,7 @@
private KmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private byte flag;
+ private boolean isFlip;
private byte checkMessage;
@@ -29,6 +30,7 @@
kmer = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable();
flag = Message.NON;
+ isFlip = false;
checkMessage = (byte) 0;
}
@@ -118,6 +120,14 @@
public void setFlag(byte message) {
this.flag = message;
}
+
+ public boolean isFlip() {
+ return isFlip;
+ }
+
+ public void setFlip(boolean isFlip) {
+ this.isFlip = isFlip;
+ }
@Override
public void write(DataOutput out) throws IOException {
@@ -128,6 +138,7 @@
kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
+ out.writeBoolean(isFlip);
out.write(flag);
}
@@ -141,6 +152,7 @@
kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
+ isFlip = in.readBoolean();
flag = in.readByte();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 05716fe..cd84272 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -44,6 +44,7 @@
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
outFlag = (byte)0;
outgoingMsg.reset();
+ headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
}
/**
@@ -53,6 +54,10 @@
headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
}
+ public byte getHeadFlag(){
+ return (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
+ }
+
/**
* get destination vertex
*/
@@ -188,22 +193,22 @@
*/
public void startSendMsg() {
if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD | MessageFlag.SHOULD_MERGEWITHNEXT));
sendMsgToAllNextNodes(getVertexValue());
voteToHalt();
}
if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD | MessageFlag.SHOULD_MERGEWITHPREV));
sendMsgToAllPreviousNodes(getVertexValue());
voteToHalt();
}
if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD | MessageFlag.SHOULD_MERGEWITHNEXT));
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD | MessageFlag.SHOULD_MERGEWITHPREV));
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
@@ -221,7 +226,10 @@
voteToHalt();
} else {
incomingMsg = msgIterator.next();
- getVertexValue().setState(MessageFlag.IS_HEAD);
+ if(getHeadFlag() > 0)
+ voteToHalt();
+ else
+ getVertexValue().setState(incomingMsg.getFlag());
}
}
}
@@ -281,7 +289,7 @@
case MessageFlag.SHOULD_MERGEWITHPREV:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlip(true);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -291,7 +299,7 @@
case MessageFlag.SHOULD_MERGEWITHNEXT:
setPredecessorAdjMsg();
if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlip(true);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -324,7 +332,7 @@
case MessageFlag.DIR_FR:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlip(true);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -335,7 +343,7 @@
case MessageFlag.DIR_RR:
setPredecessorAdjMsg();
if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlip(true);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -356,7 +364,7 @@
case MessageFlag.SHOULD_MERGEWITHNEXT:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlip(true);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -367,7 +375,7 @@
case MessageFlag.SHOULD_MERGEWITHPREV:
setPredecessorAdjMsg();
if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlip(true);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -451,15 +459,11 @@
* updateAdjList
*/
public void processUpdate(){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ inFlag = incomingMsg.getFlag();
+ byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
@@ -479,12 +483,7 @@
getVertexValue().setState(state);
}
- boolean flip;
- if((inFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
@@ -497,13 +496,8 @@
public void processMerge(MessageWritable msg){
byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index f70a76e..e702bbd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -101,6 +101,9 @@
* set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
*/
protected boolean setNextInfo(VertexValueWritable value) {
+ if(headFlag > 0 && (getVertexValue().getState() & MessageFlag.SHOULD_MERGEWITHPREV) > 0){
+ return false;
+ }
if (value.getFFList().getCountOfPosition() > 0) {
nextID.set(value.getFFList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
@@ -118,6 +121,9 @@
* set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
*/
protected boolean setPrevInfo(VertexValueWritable value) {
+ if(headFlag > 0 && (getVertexValue().getState() & MessageFlag.SHOULD_MERGEWITHNEXT) > 0){
+ return false;
+ }
if (value.getRRList().getCountOfPosition() > 0) {
prevID.set(value.getRRList().getPosition(0));
prevHead = isNodeRandomHead(prevID);
@@ -163,7 +169,8 @@
} else if (hasPrev && !prevHead && (getPreDestVertexId(getVertexValue()) != null)) {
// compress this head to the reverse tail
sendUpdateMsgToSuccessor();
- }
+ } else
+ voteToHalt();
} else {
// I'm a tail
if (hasNext && hasPrev) {
@@ -171,20 +178,24 @@
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
sendUpdateMsgToPredecessor();
- }
+ } else
+ voteToHalt();
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
sendUpdateMsgToPredecessor();
- }
+ } else
+ voteToHalt();
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
sendUpdateMsgToSuccessor();
- }
- }
+ } else
+ voteToHalt();
+ } else
+ voteToHalt();
}
}
}
@@ -193,6 +204,7 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
processUpdate();
+ voteToHalt();
}
} else if (getSuperstep() % 4 == 1){
//send message to the merge object and kill self
@@ -205,8 +217,7 @@
processMerge();
//head meets head, stop
- headFlag = (byte) (MessageFlag.VERTEX_MASK & incomingMsg.getFlag());
- if(headFlag == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)
+ if(getMsgFlag() == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)
voteToHalt();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index d68c98b..a67d645 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -295,7 +295,7 @@
* @throws IOException
*/
public void broadcastUpdateMsg(){
- switch(getVertexValue().getState() & 0b0001){
+ /* switch(getVertexValue().getState() & 0b0001){
case MessageFlag.SHOULD_MERGEWITHPREV:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
@@ -314,7 +314,7 @@
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
break;
- }
+ }*/
}
/**
@@ -390,7 +390,7 @@
* updateAdjList
*/
public void processUpdate(){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
boolean flip;
@@ -401,14 +401,14 @@
byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));*/
}
/**
* merge and updateAdjList
*/
public void processMerge(){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
boolean flip;
@@ -420,7 +420,7 @@
getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getKmer());
+ kmerSize, incomingMsg.getKmer());*/
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index 855deaa..09ea917 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -4,15 +4,11 @@
public class MessageFlag extends MergeDirFlag {
- public static final byte FLIP = 1 << 6;
+ //public static final byte FLIP = 1 << 6;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
- switch (code) {
- case FLIP:
- return "FLIP";
- }
return "ERROR_BAD_MESSAGE";
}
}