Apply undirected graph algorithm in p2
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 2c4ac0d..291839c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -31,17 +31,31 @@
// merge/update directions
public static class DirectionFlag {
- public static final byte DIR_FF = 0b00 << 0;
- public static final byte DIR_FR = 0b01 << 0;
- public static final byte DIR_RF = 0b10 << 0;
- public static final byte DIR_RR = 0b11 << 0;
- public static final byte DIR_MASK = 0b11 << 0;
+ public static final byte DIR_FF = 0b000 << 0;
+ public static final byte DIR_FR = 0b001 << 0;
+ public static final byte DIR_RF = 0b010 << 0;
+ public static final byte DIR_RR = 0b011 << 0;
+ public static final byte DIR_NO = 0b111 << 0;
+ public static final byte DIR_MASK = 0b111 << 0;
}
- public static class MergeDirFlag extends DirectionFlag{
- public static final byte SHOULD_MERGEWITHNEXT = 0b0 << 2;
- public static final byte SHOULD_MERGEWITHPREV = 0b1 << 2;
- public static final byte SHOULD_MERGE_MASK = 0b1 << 2;
+ public static class SpecialVertexFlag extends DirectionFlag {
+ public static final byte IS_RANDOMTAIL = 0b00 << 4;
+ public static final byte IS_STOP = 0b00 << 4;
+ public static final byte IS_HEAD = 0b01 << 4;
+ public static final byte IS_FINAL = 0b10 << 4;
+ public static final byte IS_RANDOMHEAD = 0b11 << 4;
+ public static final byte IS_OLDHEAD = 0b11 << 4;
+
+ public static final byte VERTEX_MASK = 0b11 << 4;
+ public static final byte VERTEX_CLEAR = (byte) 11001111;
+ }
+
+ public static class MergeDirFlag extends SpecialVertexFlag{
+ public static final byte NO_MERGE = 0b00 << 2;
+ public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 2;
+ public static final byte SHOULD_MERGEWITHPREV = 0b10 << 2;
+ public static final byte SHOULD_MERGE_MASK = 0b11 << 2;
}
private PositionWritable nodeID;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 8620445..ebcf7f0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -143,7 +143,7 @@
return;
}
}
- throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+ //throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
}
public void set(PositionListWritable list2) {
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-0 b/genomix/genomix-pregelix/data/input/pathmerge/part-0
new file mode 100755
index 0000000..56056ab
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-1 b/genomix/genomix-pregelix/data/input/pathmerge/part-1
new file mode 100755
index 0000000..95701ce
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-2 b/genomix/genomix-pregelix/data/input/pathmerge/part-2
new file mode 100755
index 0000000..6bb6633
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-3 b/genomix/genomix-pregelix/data/input/pathmerge/part-3
new file mode 100755
index 0000000..caa63de
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-0 b/genomix/genomix-pregelix/data/input/read2/part-0
new file mode 100755
index 0000000..1620187
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-1 b/genomix/genomix-pregelix/data/input/read2/part-1
new file mode 100755
index 0000000..d2e2476
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-2 b/genomix/genomix-pregelix/data/input/read2/part-2
new file mode 100755
index 0000000..7f3575e
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-3 b/genomix/genomix-pregelix/data/input/read2/part-3
new file mode 100755
index 0000000..03e23e1
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index c4365ff..9e2e5f0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -8,6 +8,7 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -35,7 +36,8 @@
@Override
public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ if(vertex.getVertexValue().getState() != MessageFlag.IS_OLDHEAD)
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 162663f..c8d1852 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -29,6 +29,8 @@
protected Iterator<PositionWritable> posIterator;
byte headFlag;
protected byte outFlag;
+ protected byte inFlag;
+ protected byte selfFlag;
/**
* initiate kmerSize, maxIteration
@@ -207,8 +209,10 @@
public void setSuccessorAdjMsg(){
if(getVertexValue().getFFList().getLength() > 0)
outFlag |= MessageFlag.DIR_FF;
- else
+ else if(getVertexValue().getFRList().getLength() > 0)
outFlag |= MessageFlag.DIR_FR;
+ else
+ outFlag |= MessageFlag.DIR_NO;
}
/**
@@ -217,8 +221,10 @@
public void setPredecessorAdjMsg(){
if(getVertexValue().getRFList().getLength() > 0)
outFlag |= MessageFlag.DIR_RF;
- else
+ else if(getVertexValue().getRRList().getLength() > 0)
outFlag |= MessageFlag.DIR_RR;
+ else
+ outFlag |= MessageFlag.DIR_NO;
}
/**
@@ -257,14 +263,14 @@
* @throws IOException
*/
public void sendMergeMsg(){
- if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0){
+ if(selfFlag == MessageFlag.IS_HEAD){
byte newState = getVertexValue().getState();
newState &= ~MessageFlag.IS_HEAD;
newState |= MessageFlag.IS_OLDHEAD;
getVertexValue().setState(newState);
+ resetSelfFlag();
outFlag |= MessageFlag.IS_HEAD;
- voteToHalt();
- } else if((getVertexValue().getState() & MessageFlag.IS_OLDHEAD) > 0){
+ } else if(selfFlag == MessageFlag.IS_OLDHEAD){
outFlag |= MessageFlag.IS_OLDHEAD;
voteToHalt();
}
@@ -301,7 +307,7 @@
* @throws IOException
*/
public void broadcastMergeMsg(){
- if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ if(headFlag > 0)
outFlag |= MessageFlag.IS_HEAD;
switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK) {
case MessageFlag.SHOULD_MERGEWITHNEXT:
@@ -313,6 +319,7 @@
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setChainVertexId(getVertexValue().getKmer());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ deleteVertex(getVertexId());
break;
case MessageFlag.SHOULD_MERGEWITHPREV:
setPredecessorAdjMsg();
@@ -323,6 +330,7 @@
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setChainVertexId(getVertexValue().getKmer());
sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ deleteVertex(getVertexId());
break;
}
}
@@ -418,11 +426,18 @@
* merge and updateAdjList merge with one neighbor
*/
public void processMerge(){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ inFlag = incomingMsg.getFlag();
+ byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ if((inFlag & MessageFlag.IS_HEAD) > 0){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.IS_HEAD;
+ getVertexValue().setState(state);
+ }
+
boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
+ if((inFlag & MessageFlag.FLIP) > 0)
flip = true;
else
flip = false;
@@ -452,6 +467,50 @@
kmerSize, msg.getKmer());
}
+ /**
+ * set head state
+ */
+ public void setHeadState(){
+ byte state = getVertexValue().getState();
+ state &= MessageFlag.VERTEX_CLEAR;
+ state |= MessageFlag.IS_HEAD;
+ getVertexValue().setState(state);
+ }
+
+ /**
+ * set final state
+ */
+ public void setFinalState(){
+ byte state = getVertexValue().getState();
+ state &= MessageFlag.VERTEX_CLEAR;
+ state |= MessageFlag.IS_FINAL;
+ getVertexValue().setState(state);
+ }
+
+ /**
+ * set final state
+ */
+ public void setStopFlag(){
+ byte state = incomingMsg.getFlag();
+ state &= MessageFlag.VERTEX_CLEAR;
+ state |= MessageFlag.IS_STOP;
+ getVertexValue().setState(state);
+ }
+
+ /**
+ * get Vertex state
+ */
+ public byte getMsgFlag(){
+ return (byte)(incomingMsg.getFlag() & MessageFlag.VERTEX_MASK);
+ }
+
+ /**
+ * reset selfFlag
+ */
+ public void resetSelfFlag(){
+ selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 429282b..500a903 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -44,10 +44,8 @@
BasicPathMergeVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
- protected MessageWritable outgoingMsg2 = new MessageWritable();
- protected PositionWritable destVertexId2 = new PositionWritable();
-
- byte finalFlag;
+ PositionWritable tempPostition = new PositionWritable();
+
/**
* initiate kmerSize, maxIteration
*/
@@ -57,7 +55,9 @@
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
+ selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
outgoingMsg.reset();
+ receivedMsgList.clear();
}
/**
@@ -65,16 +65,22 @@
*/
public void sendOutMsg() {
//send wantToMerge to next
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, outgoingMsg);
+ tempPostition = getNextDestVertexId(getVertexValue());
+ if(tempPostition != null){
+ destVertexId.set(tempPostition);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, outgoingMsg);
+ }
////send wantToMerge to prev
- destVertexId2.set(getPreDestVertexId(getVertexValue()));
- outgoingMsg2.setFlag(outFlag);
- outgoingMsg2.setSourceVertexId(getVertexId());
- sendMsg(destVertexId2, outgoingMsg2);
+ tempPostition = getPreDestVertexId(getVertexValue());
+ if(tempPostition != null){
+ destVertexId.set(tempPostition);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, outgoingMsg);
+ }
}
/**
@@ -84,88 +90,96 @@
int countHead = 0;
int countOldHead = 0;
for(int i = 0; i < receivedMsgList.size(); i++){
- if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_HEAD) > 0)
- countHead++;
- if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_OLDHEAD) > 0)
- countOldHead++;
+ inFlag = receivedMsgList.get(i).getFlag();
+ switch(inFlag & MessageFlag.VERTEX_MASK){
+ case MessageFlag.IS_HEAD:
+ countHead++;
+ break;
+ case MessageFlag.IS_OLDHEAD:
+ countOldHead++;
+ break;
+ }
}
- if(countHead == 0 && countOldHead == 0)
- return MessageFromHead.BothMsgsFromNonHead;
- else if(countHead == 2)
+ if(countHead == 2)
return MessageFromHead.BothMsgsFromHead;
- else if(countOldHead == 2)
- return MessageFromHead.BothMsgsFromOldHead;
- else if(countHead == 1 && headFlag == 0)
- return MessageFromHead.OneMsgFromHeadToNonHead;
- else if(countHead == 1 && headFlag > 0)
- return MessageFromHead.OneMsgFromHeadToHead;
- else if(countOldHead == 1)
- return MessageFromHead.OneMsgFromNonHead;
-
- return MessageFromHead.NO_INFO;
+ else if(countHead == 1 && countOldHead == 1)
+ return MessageFromHead.OneMsgFromOldHeadAndOneFromHead;
+ else if(countHead == 1 && countOldHead == 0)
+ return MessageFromHead.OneMsgFromHeadAndOneFromNonHead;
+ else if(countHead == 0 && countOldHead == 0)
+ return MessageFromHead.BothMsgsFromNonHead;
+ else
+ return MessageFromHead.NO_MSG;
}
/**
* head send message to path
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
- //process merge when receiving msg
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- receivedMsgList.add(incomingMsg);
- }
- if(receivedMsgList.size() != 0){
- byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
- switch(numOfMsgsFromHead){
- case MessageFromHead.BothMsgsFromNonHead:
- for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- break;
- case MessageFromHead.BothMsgsFromHead:
- case MessageFromHead.BothMsgsFromOldHead:
- for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_FINAL);
- break;
- case MessageFromHead.OneMsgFromHeadToNonHead:
- for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_HEAD);
- break;
- case MessageFromHead.OneMsgFromHeadToHead: //stop condition
- for(int i = 0; i < 2; i++){
- if(headFlag > 0){
- processMerge(receivedMsgList.get(i));
- break;
- }
- }
- getVertexValue().setState(MessageFlag.IS_FINAL);
- //voteToHalt();
- break;
- case MessageFromHead.OneMsgFromNonHead:
- //halt
- //voteToHalt();
- break;
- }
- }
//send out wantToMerge msg
- resetHeadFlag();
- finalFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_FINAL);
- outFlag = (byte)(headFlag | finalFlag);
- if(outFlag == 0)
- sendOutMsg();
+ if(selfFlag != MessageFlag.IS_HEAD){
+ sendOutMsg();
+ }
}
/**
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
+ if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
+ getVertexValue().setState(MessageFlag.IS_STOP);
+ sendOutMsg();
+ }
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- sendMergeMsg();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ processMerge(incomingMsg);
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ }else
+ sendMergeMsg();
}
}
+ /**
+ * head vertex process merge
+ */
+ public void processMergeInHeadVertex(Iterator<MessageWritable> msgIterator){
+ //process merge when receiving msg
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ setStopFlag();
+ sendMergeMsg();
+ break;
+ }
+ receivedMsgList.add(incomingMsg);
+ }
+ if(receivedMsgList.size() != 0){
+ byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
+ switch(numOfMsgsFromHead){
+ case MessageFromHead.BothMsgsFromHead:
+ case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ voteToHalt();
+ break;
+ case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_HEAD);
+ break;
+ case MessageFromHead.BothMsgsFromNonHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ break;
+ case MessageFromHead.NO_MSG:
+ //halt
+ deleteVertex(getVertexId());
+ break;
+ }
+ }
+ }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -173,13 +187,17 @@
startSendMsg();
else if (getSuperstep() == 2)
initState(msgIterator);
- else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
+ else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
sendMsgToPathVertex(msgIterator);
- if(headFlag == 0)
+ if(selfFlag != MessageFlag.IS_HEAD)
voteToHalt();
- } else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
+ } else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- } else
+ if(selfFlag != MessageFlag.IS_HEAD)
+ voteToHalt();
+ } else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
+ processMergeInHeadVertex(msgIterator);
+ }else
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index ef699fc..f70a76e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -12,7 +12,6 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.type.State;
/*
* vertexId: BytesWritable
@@ -62,8 +61,6 @@
private boolean curHead;
private boolean nextHead;
private boolean prevHead;
- private byte headFlag;
- private byte tailFlag;
private byte selfFlag;
/**
@@ -86,6 +83,9 @@
nextHead = false;
prevHead = false;
outFlag = (byte)0;
+ inFlag = (byte)0;
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
outgoingMsg.reset();
}
@@ -139,20 +139,22 @@
else if (getSuperstep() == 2)
initState(msgIterator);
else if (getSuperstep() % 4 == 3){
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
- tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
- outFlag = (byte) (headFlag | tailFlag);
+
+ //tailFlag = (byte) (MessageFlag.IS_TAIL & getVertexValue().getState());
+ //outFlag = (byte) (headFlag | tailFlag);
+ outFlag |= headFlag;
+ outFlag |= MessageFlag.NO_MERGE;
// only PATH vertices are present. Find the ID's for my neighbors
curID.set(getVertexId());
curHead = isNodeRandomHead(curID);
+
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
// We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
- hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+ hasNext = setNextInfo(getVertexValue());//&& headFlag == 0;
+ hasPrev = setPrevInfo(getVertexValue());//&& headFlag == 0;
if (hasNext || hasPrev) {
if (curHead) {
if (hasNext && !nextHead && (getNextDestVertexId(getVertexValue()) != null)) {
@@ -165,27 +167,26 @@
} else {
// I'm a tail
if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) > 0 && curID.compareTo(prevID) > 0)) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
sendUpdateMsgToPredecessor();
}
} else if (!hasPrev) {
// no previous node
- if (!nextHead && curID.compareTo(nextID) > 0) {
+ if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
sendUpdateMsgToPredecessor();
}
} else if (!hasNext) {
// no next node
- if (!prevHead && curID.compareTo(prevID) > 0) {
+ if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
sendUpdateMsgToSuccessor();
}
}
}
}
- voteToHalt();
}
else if (getSuperstep() % 4 == 0){
//update neighber
@@ -196,17 +197,16 @@
} else if (getSuperstep() % 4 == 1){
//send message to the merge object and kill self
broadcastMergeMsg();
- deleteVertex(getVertexId());
} else if (getSuperstep() % 4 == 2){
//merge kmer
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
+ selfFlag = (byte) (MessageFlag.VERTEX_MASK & getVertexValue().getState());
processMerge();
//head meets head, stop
- headFlag = (byte) (MessageFlag.IS_HEAD & incomingMsg.getFlag());
- selfFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
- if((headFlag & selfFlag) > 0)
+ headFlag = (byte) (MessageFlag.VERTEX_MASK & incomingMsg.getFlag());
+ if(headFlag == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)
voteToHalt();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 27eb40a..d68c98b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -243,7 +243,7 @@
if (incomingMsg.getFlag() == Message.START) {
getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
} else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(MessageFlag.IS_TAIL);
+ getVertexValue().setState(MessageFlag.IS_HEAD);
getVertexValue().setKmer(getVertexValue().getKmer());
//voteToHalt();
} //else
@@ -445,7 +445,7 @@
// We prevent merging towards non-path nodes
hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
getVertexValue().setState(outFlag);
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index c130ec6..855deaa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -2,21 +2,14 @@
import edu.uci.ics.genomix.type.NodeWritable.MergeDirFlag;
-public class MessageFlag extends MergeDirFlag{
+public class MessageFlag extends MergeDirFlag {
- public static final byte FLIP = 1 << 3;
- public static final byte IS_HEAD = 1 << 4;
- public static final byte IS_TAIL = 1 << 5;
- public static final byte IS_OLDHEAD = 1 << 6;
- public static final byte IS_FINAL = 0b000011;
+ public static final byte FLIP = 1 << 6;
+
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
switch (code) {
- case IS_HEAD:
- return "IS_HEAD";
- case IS_OLDHEAD:
- return "IS_OLDHEAD";
case FLIP:
return "FLIP";
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
index f343c2e..05a2f95 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
@@ -1,13 +1,17 @@
package edu.uci.ics.genomix.pregelix.type;
public class MessageFromHead {
- public static final byte BothMsgsFromHead = 1 << 0;
- public static final byte BothMsgsFromNonHead = 1 << 1;
- public static final byte BothMsgsFromOldHead = 1 << 2;
- public static final byte OneMsgFromHead = 1 << 3;
- public static final byte OneMsgFromNonHead = 1 << 4;
- public static final byte OneMsgFromHeadToNonHead = 1 << 5;
- public static final byte OneMsgFromHeadToHead = 1 << 6;
+ public static final byte BothMsgsFromHead = 0b0000 << 1;
+ public static final byte BothMsgsFromNonHead = 0b0001 << 1;
+ public static final byte BothMsgsFromOldHead = 0b0010 << 1;
+ public static final byte OneMsgFromHead = 0b0011 << 1;
+ public static final byte OneMsgFromNonHead = 0b0100 << 1;
+ public static final byte OneMsgFromHeadAndOneFromNonHead = 0b0101 << 1;
+ public static final byte OneMsgFromHeadToHead = 0b0110 << 1;
+ public static final byte OneMsgFromOldHeadToNonHead = 0b0111 << 1;
+ public static final byte OneMsgFromOldHeadToHead = 0b1000 << 1;
+ public static final byte OneMsgFromOldHeadAndOneFromHead = 0b1001 << 1;
+ public static final byte NO_MSG = 0b1010 << 1;
public static final byte NO_INFO = 0 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 5a113e0..0a366db 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -201,7 +201,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
//genTipRemoveGraph();
@@ -209,7 +209,7 @@
//genBridgeRemoveGraph();
//genBubbleAddGraph();
//genBubbleMergeGraph();
- genP4ForMergeGraph();
+ //genP4ForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index 355bc7a..7bf5aa3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 3;
- private static final int ReadLength = 7;
+ private static final int ReadLength = 8;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
+ private static final String DATA_INPUT_PATH = "data/graphbuild.test/read2.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";