fix path merge flaws
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 935da50..df49879 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -21,8 +21,7 @@
private PositionWritable sourceVertexId;
private KmerBytesWritable chainVertexId;
private AdjacencyListWritable neighberNode; //incoming or outgoing
- private byte message;
- private byte adjMessage;
+ private byte flag;
private byte checkMessage;
@@ -30,8 +29,7 @@
sourceVertexId = new PositionWritable();
chainVertexId = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable();
- message = Message.NON;
- adjMessage = AdjMessage.NON;
+ flag = Message.NON;
checkMessage = (byte) 0;
}
@@ -50,8 +48,7 @@
this.neighberNode.set(msg.getNeighberNode());
}
checkMessage |= CheckMessage.ADJMSG;
- this.adjMessage = msg.getAdjMessage();
- this.message = msg.getMessage();
+ this.flag = msg.getFlag();
}
public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
@@ -68,14 +65,14 @@
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(neighberNode);
}
- this.message = message;
+ this.flag = message;
}
public void reset() {
checkMessage = 0;
chainVertexId.reset(1);
neighberNode.reset();
- message = Message.NON;
+ flag = Message.NON;
}
public PositionWritable getSourceVertexId() {
@@ -114,22 +111,13 @@
public int getLengthOfChain() {
return chainVertexId.getKmerLength();
}
-
- public byte getAdjMessage() {
- return adjMessage;
+
+ public byte getFlag() {
+ return flag;
}
- public void setAdjMessage(byte adjMessage) {
- checkMessage |= CheckMessage.ADJMSG;
- this.adjMessage = adjMessage;
- }
-
- public byte getMessage() {
- return message;
- }
-
- public void setMessage(byte message) {
- this.message = message;
+ public void setFlag(byte message) {
+ this.flag = message;
}
@Override
@@ -141,9 +129,7 @@
chainVertexId.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
- if ((checkMessage & CheckMessage.ADJMSG) != 0)
- out.write(adjMessage);
- out.write(message);
+ out.write(flag);
}
@Override
@@ -156,9 +142,7 @@
chainVertexId.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
- if ((checkMessage & CheckMessage.ADJMSG) != 0)
- adjMessage = in.readByte();
- message = in.readByte();
+ flag = in.readByte();
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index de483bc..15f8e96 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -4,6 +4,7 @@
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
@@ -117,6 +118,14 @@
this.mergeChain.set(mergeChain);
}
+ public KmerBytesWritable getKmer() {
+ return mergeChain;
+ }
+
+ public void setKmer(KmerBytesWritable mergeChain) {
+ this.mergeChain.set(mergeChain);
+ }
+
public PositionWritable getMergeDest() {
return mergeDest;
}
@@ -167,4 +176,81 @@
public int outDegree(){
return outgoingList.getForwardList().getCountOfPosition() + outgoingList.getReverseList().getCountOfPosition();
}
+
+ /*
+ * Process any changes to value. This is for edge updates
+ */
+ public void processUpdates(byte neighborToDeleteDir, PositionWritable nodeToDelete,
+ byte neighborToMergeDir, PositionWritable nodeToAdd){
+ switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().remove(nodeToDelete);
+ break;
+ }
+ switch (neighborToMergeDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().append(nodeToAdd);
+ break;
+ }
+ }
+
+ /*
+ * Process any changes to value. This is for merging
+ */
+ public void processMerges(byte neighborToDeleteDir, PositionWritable nodeToDelete,
+ byte neighborToMergeDir, PositionWritable nodeToAdd,
+ int kmerSize, KmerBytesWritable kmer){
+ switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().remove(nodeToDelete); //set(null);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().remove(nodeToDelete);
+ break;
+ }
+ switch (neighborToMergeDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getKmer().mergeWithFFKmer(kmerSize, kmer);
+ this.getFFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getKmer().mergeWithFRKmer(kmerSize, kmer);
+ this.getFRList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getKmer().mergeWithRFKmer(kmerSize, kmer);
+ this.getRFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getKmer().mergeWithRRKmer(kmerSize, kmer);
+ this.getRRList().append(nodeToAdd);
+ break;
+ }
+ }
+
+
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index dca2cb8..0e4ff95 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -75,7 +75,7 @@
builder.append("Send message to " + "\r\n");
builder.append("Destination Code: " + dest + "\r\n");
}
- builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getMessage()) + "\r\n");
+ builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getFlag()) + "\r\n");
if (msg.getLengthOfChain() != -1) {
chain = msg.getChainVertexId().toString();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 2770ab8..d05a00b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -79,14 +79,14 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -99,14 +99,14 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -118,61 +118,61 @@
*/
public void broadcaseKillself(){
outgoingMsg.setSourceVertexId(getVertexId());
- if(receivedMsgList.get(0).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRR){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ if(receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR){
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRR) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
} // RR
- else if(receivedMsgList.get(1).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRR){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ else if(receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR){
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRR) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
}
@@ -184,7 +184,7 @@
public void responseToDeadVertex(Iterator<MessageWritable> msgIterator){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ if(incomingMsg.getFlag() == AdjMessage.FROMFF){
//remove incomingMsg.getSourceId from RR positionList
iterator = getVertexValue().getRRList().iterator();
while(iterator.hasNext()){
@@ -194,7 +194,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMFR){
//remove incomingMsg.getSourceId from RF positionList
iterator = getVertexValue().getRFList().iterator();
while(iterator.hasNext()){
@@ -204,7 +204,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMRF){
//remove incomingMsg.getSourceId from FR positionList
iterator = getVertexValue().getFRList().iterator();
while(iterator.hasNext()){
@@ -214,7 +214,7 @@
break;
}
}
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ } else{ //incomingMsg.getFlag() == AdjMessage.FROMRR
//remove incomingMsg.getSourceId from FF positionList
iterator = getVertexValue().getFFList().iterator();
while(iterator.hasNext()){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 396d43d..6b7bc5d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -181,7 +181,7 @@
public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
- if(VertexUtil.isHeadVertex(getVertexValue())
+ if(VertexUtil.isHeadVertexWithIndegree(getVertexValue())
|| VertexUtil.isHeadWithoutIndegree(getVertexValue())){
outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setSourceVertexId(getVertexId());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index cff6cd7..992dbaa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -126,23 +126,23 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
sendMsgToAllNextNodes(getVertexValue());
voteToHalt();
}
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
sendMsgToAllPreviousNodes(getVertexValue());
voteToHalt();
}
if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setFlag(Message.START);
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
+ outgoingMsg.setFlag(Message.END);
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
@@ -169,10 +169,10 @@
* set vertex state
*/
public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
+ if (incomingMsg.getFlag() == Message.START) {
getVertexValue().setState(State.START_VERTEX);
//getVertexValue().setMergeChain(null);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
getVertexValue().setMergeChain(getVertexValue().getMergeChain());
voteToHalt();
@@ -185,11 +185,11 @@
*/
public void sendOutMsg() {
if (getVertexValue().getState() == State.START_VERTEX) {
- outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setFlag(Message.START);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
} else if (getVertexValue().getState() != State.END_VERTEX) {
- outgoingMsg.setMessage(Message.NON);
+ outgoingMsg.setFlag(Message.NON);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
}
@@ -205,7 +205,7 @@
if (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if (mergeChainVertex()) {
- if (incomingMsg.getMessage() == Message.END) {
+ if (incomingMsg.getFlag() == Message.END) {
if (getVertexValue().getState() == State.START_VERTEX) {
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
@@ -228,10 +228,10 @@
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.END);
+ outgoingMsg.setFlag(Message.END);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- if (incomingMsg.getMessage() == Message.START)
+ if (incomingMsg.getFlag() == Message.START)
deleteVertex(getVertexId());
} else {
if (getVertexValue().getState() != State.START_VERTEX && getVertexValue().getState() != State.END_VERTEX)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index 47ca06e..e9d359f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -131,20 +131,20 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
sendMsgToAllPreviousNodes(getVertexValue());
}
if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setFlag(Message.START);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
+ outgoingMsg.setFlag(Message.END);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
}
@@ -170,9 +170,9 @@
* set vertex state
*/
public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
+ if (incomingMsg.getFlag() == Message.START) {
getVertexValue().setState(State.START_VERTEX);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
voteToHalt();
} else
@@ -201,7 +201,7 @@
} else {
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if (incomingMsg.getMessage() != Message.STOP) {
+ if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getNextDestVertexId(getVertexValue()));
@@ -224,7 +224,7 @@
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
+ outgoingMsg.setFlag(Message.STOP);
destVertexId.set(incomingMsg.getSourceVertexId());
sendMsg(destVertexId, outgoingMsg);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index 949e8bb..c9762f2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -138,20 +138,20 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
sendMsgToAllPreviousNodes(getVertexValue());
}
if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setFlag(Message.START);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
+ outgoingMsg.setFlag(Message.END);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
}
@@ -191,9 +191,9 @@
* set vertex state
*/
public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
+ if (incomingMsg.getFlag() == Message.START) {
getVertexValue().setState(State.START_VERTEX);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
voteToHalt();
} else
@@ -205,7 +205,7 @@
*/
public void markPseudoHead() {
getVertexValue().setState(State.PSEUDOHEAD);
- outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+ outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
destVertexId
.set(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -215,12 +215,12 @@
* mark the pseudoRear
*/
public void markPseudoRear() {
- if (incomingMsg.getMessage() == Message.FROMPSEUDOHEAD
+ if (incomingMsg.getFlag() == Message.FROMPSEUDOHEAD
&& getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.PSEUDOREAR);
voteToHalt();
}
- else if(incomingMsg.getMessage() == Message.FROMPSEUDOHEAD
+ else if(incomingMsg.getFlag() == Message.FROMPSEUDOHEAD
&& getVertexValue().getState() == State.START_VERTEX){
getVertexValue().setState(State.START_HALT);
}
@@ -249,7 +249,7 @@
} else {
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if (incomingMsg.getMessage() != Message.STOP) {
+ if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId
@@ -273,7 +273,7 @@
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
+ outgoingMsg.setFlag(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
}
@@ -292,10 +292,10 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
//if from pseudoHead, voteToHalt(), otherwise ...
- if (incomingMsg.getMessage() != Message.FROMPSEUDOHEAD){
+ if (incomingMsg.getFlag() != Message.FROMPSEUDOHEAD){
mergeChainVertex();
- if (incomingMsg.getMessage() != Message.STOP
- && incomingMsg.getMessage() != Message.FROMPSEUDOREAR) {
+ if (incomingMsg.getFlag() != Message.STOP
+ && incomingMsg.getFlag() != Message.FROMPSEUDOREAR) {
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -303,12 +303,12 @@
} else {
//check head or pseudoHead
if (getVertexValue().getState() == State.START_VERTEX
- && incomingMsg.getMessage() == Message.STOP) {
+ && incomingMsg.getFlag() == Message.STOP) {
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
//System.out.println();
} else if(getVertexValue().getState() == State.PSEUDOHEAD
- && incomingMsg.getMessage() == Message.STOP)
+ && incomingMsg.getFlag() == Message.STOP)
getVertexValue().setState(State.END_VERTEX);
}
}
@@ -321,15 +321,15 @@
*/
public void responseMsgToHeadVertexPartitionPhase() {
if (getVertexValue().getState() == State.PSEUDOHEAD)
- outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+ outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
else {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.PSEUDOREAR)
- outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
+ outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
+ outgoingMsg.setFlag(Message.STOP);
}
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
voteToHalt();
@@ -345,12 +345,12 @@
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
//check head or pseudoHead
if (getVertexValue().getState() == State.START_VERTEX
- && incomingMsg.getMessage() == Message.STOP) {
+ && incomingMsg.getFlag() == Message.STOP) {
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
//System.out.println();
} else if(getVertexValue().getState() == State.PSEUDOHEAD
- && incomingMsg.getMessage() == Message.STOP)
+ && incomingMsg.getFlag() == Message.STOP)
getVertexValue().setState(State.END_VERTEX);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index b6c03fb..39fe619 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
@@ -14,7 +15,6 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
@@ -75,11 +75,10 @@
private byte headFlag;
private byte tailFlag;
private byte outFlag;
- private byte outUpFlag;
+ private byte selfFlag;
private MessageWritable incomingMsg = new MessageWritable();
private MessageWritable outgoingMsg = new MessageWritable();
- private MessageWritable outgoingUpMsg = new MessageWritable();
private PositionWritable destVertexId = new PositionWritable();
private Iterator<PositionWritable> posIterator;
@@ -101,6 +100,7 @@
curHead = false;
nextHead = false;
prevHead = false;
+ outFlag = (byte)0;
outgoingMsg.reset();
}
@@ -199,23 +199,23 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
sendMsgToAllNextNodes(getVertexValue());
voteToHalt();
}
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
sendMsgToAllPreviousNodes(getVertexValue());
voteToHalt();
}
if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
@@ -233,24 +233,10 @@
voteToHalt();
} else {
incomingMsg = msgIterator.next();
- setState();
+ getVertexValue().setState(MessageFlag.IS_HEAD);
}
}
}
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
- getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(MessageFlag.IS_TAIL);
- getVertexValue().setMergeChain(getVertexValue().getMergeChain());
- //voteToHalt();
- } //else
- //voteToHalt();
- }
/**
* check if A need to be flipped with successor
@@ -277,9 +263,9 @@
*/
public void setSuccessorAdjMsg(){
if(getVertexValue().getFFList().getLength() > 0)
- outgoingMsg.setAdjMessage(AdjMessage.FROMFF);
+ outFlag |= MessageFlag.DIR_FF;
else
- outgoingMsg.setAdjMessage(AdjMessage.FROMFR);
+ outFlag |= MessageFlag.DIR_FR;
}
/**
@@ -287,41 +273,79 @@
*/
public void setPredecessorAdjMsg(){
if(getVertexValue().getRFList().getLength() > 0)
- outgoingMsg.setAdjMessage(AdjMessage.FROMRF);
+ outFlag |= MessageFlag.DIR_RF;
else
- outgoingMsg.setAdjMessage(AdjMessage.FROMRR);
+ outFlag |= MessageFlag.DIR_RF;
}
/**
* send update message to neighber
+ * @throws IOException
*/
public void broadcastUpdateMsg(){
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- setSuccessorAdjMsg();
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- outUpFlag = (byte)(MessageFlag.FROM_SUCCESSOR);
- outgoingUpMsg.setNeighberNode(getVertexValue().getOutgoingList());
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingUpMsg.setMessage(outUpFlag);
- outgoingUpMsg.setSourceVertexId(getVertexId());
- setPredecessorAdjMsg();
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingUpMsg);
- //remove its own neighbers
- getVertexValue().setIncomingList(null);
- getVertexValue().setOutgoingList(null);
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ outFlag |= MessageFlag.IS_HEAD;
+ switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * send merge message to neighber
+ * @throws IOException
+ */
+ public void broadcastMergeMsg(){
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ outFlag |= MessageFlag.IS_HEAD;
+ switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
}
/**
* This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
*/
public void sendUpMsgFromPredecessor(){
- getVertexValue().setState(MessageFlag.SHOULD_MERGEWITHNEXT);
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHNEXT;
+ getVertexValue().setState(state);
if(getVertexValue().getFFList().getLength() > 0)
getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
else
@@ -331,9 +355,12 @@
/**
* This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
*/
public void sendUpMsgFromSuccessor(){
- getVertexValue().setState(MessageFlag.SHOULD_MERGEWITHPREV);
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHPREV;
+ getVertexValue().setState(state);
if(getVertexValue().getRFList().getLength() > 0)
getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
else
@@ -342,56 +369,79 @@
}
/**
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /**
+ * check if need filp
+ */
+ public byte flipDirection(byte neighborDir, boolean flip){
+ if(flip){
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+ } else
+ return neighborDir;
+ }
+
+ /**
* updateAdjList
*/
- public void updateAdjList(){
- if(incomingMsg.getAdjMessage() == AdjMessage.FROMFF){
- getVertexValue().setRRList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setRFList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setRFList(incomingMsg.getNeighberNode().getReverseList());
- } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMFR){
- getVertexValue().setFRList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setFFList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setFFList(incomingMsg.getNeighberNode().getReverseList());
- } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMRF){
- getVertexValue().setRFList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setRRList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setRRList(incomingMsg.getNeighberNode().getReverseList());
- } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMRR){
- getVertexValue().setFFList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setFRList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setFRList(incomingMsg.getNeighberNode().getReverseList());
- }
+ public void processUpdate(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
}
/**
- * update AdjacencyList if message from predecessor
+ * merge and updateAdjList
*/
- public void updateAdjList_MsgPredecessor(){
- if((outFlag & MessageFlag.FLIP) > 0){
- updateAdjList();
- } else {
- getVertexValue().setIncomingList(incomingMsg.getNeighberNode());
- }
- }
-
- /**
- * update AdjacencyList if message from successor
- */
- public void updateAdjList_MsgSuccessor(){
- if((outFlag & MessageFlag.FLIP) > 0){
- updateAdjList();
- } else {
- getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- }
+ public void processMerge(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
+ kmerSize, incomingMsg.getChainVertexId());
}
@Override
@@ -457,44 +507,22 @@
//update neighber
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- outFlag = incomingMsg.getMessage();
- if((outFlag & MessageFlag.FROM_PREDECESSOR) > 0){
- updateAdjList_MsgPredecessor();
- }
- else {//Message from successor.
- updateAdjList_MsgSuccessor();
- }
+ processUpdate();
}
} else if (getSuperstep() % 4 == 1){
//send message to the merge object and kill self
- if((getVertexValue().getState() | MessageFlag.SHOULD_MERGEWITHNEXT) > 0){
- setSuccessorAdjMsg();
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- sendMsg(getVertexValue().getMergeDest(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if((getVertexValue().getState() | MessageFlag.SHOULD_MERGEWITHPREV) > 0){
- setPredecessorAdjMsg();
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- sendMsg(getVertexValue().getMergeDest(), outgoingMsg);
- deleteVertex(getVertexId());
- }
+ broadcastMergeMsg();
+ deleteVertex(getVertexId());
} else if (getSuperstep() % 4 == 2){
//merge kmer
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- outFlag = incomingMsg.getMessage();
- if(outFlag == AdjMessage.FROMFF){
- //mergeWithRR(incomingMsg.getChain())
- }
- else if(outFlag == AdjMessage.FROMFR){
- //mergeWithRF(incomingMsg.getChain())
- }
- else if(outFlag == AdjMessage.FROMRF){
- //mergeWithFR(incomingMsg.getChain())
- }
- else if(outFlag == AdjMessage.FROMRR){
- //mergeWithFF(incomingMsg.getChain())
- }
+ processMerge();
+
+ headFlag = (byte) (MessageFlag.IS_HEAD & incomingMsg.getFlag());
+ selfFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
+ if((headFlag & selfFlag) > 0)
+ voteToHalt();
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
new file mode 100644
index 0000000..75d53e9
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -0,0 +1,501 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class P5ForPathMergeVertex extends
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "P5ForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "P5ForPathMergeVertex.iteration";
+ public static final String RANDSEED = "P5ForPathMergeVertex.randSeed";
+ public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private static long randSeed = -1;
+ private float probBeingRandomHead = -1;
+ private Random randGenerator;
+
+ private PositionWritable curID = new PositionWritable();
+ private PositionWritable nextID = new PositionWritable();
+ private PositionWritable prevID = new PositionWritable();
+ private boolean hasNext;
+ private boolean hasPrev;
+ private boolean curHead;
+ private boolean nextHead;
+ private boolean prevHead;
+ private byte headFlag;
+ private byte tailFlag;
+ private byte outFlag;
+
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if (randSeed < 0)
+ randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ if (probBeingRandomHead < 0)
+ probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
+ hasNext = false;
+ hasPrev = false;
+ curHead = false;
+ nextHead = false;
+ prevHead = false;
+ outgoingMsg.reset();
+ }
+
+ protected boolean isNodeRandomHead(PositionWritable nodeID) {
+ // "deterministically random", based on node id
+ randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ return randGenerator.nextFloat() < probBeingRandomHead;
+ }
+
+ /**
+ * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
+ */
+ protected boolean setNextInfo(VertexValueWritable value) {
+ if (value.getFFList().getCountOfPosition() > 0) {
+ nextID.set(value.getFFList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ if (value.getFRList().getCountOfPosition() > 0) {
+ nextID.set(value.getFRList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
+ */
+ protected boolean setPrevInfo(VertexValueWritable value) {
+ if (value.getRRList().getCountOfPosition() > 0) {
+ prevID.set(value.getRRList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ if (value.getRFList().getCountOfPosition() > 0) {
+ prevID.set(value.getRFList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * get destination vertex
+ */
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ return posIterator.next();
+ }
+
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ else // #RRList() > 0
+ posIterator = value.getRRList().iterator();
+ return posIterator.next();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * start sending message
+ */
+ public void startSendMsg() {
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
+ sendMsgToAllNextNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
+ sendMsgToAllPreviousNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setFlag(Message.START);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+ outgoingMsg.setFlag(Message.END);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ }
+
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<MessageWritable> msgIterator) {
+ while (msgIterator.hasNext()) {
+ if (!VertexUtil.isPathVertex(getVertexValue())
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+ msgIterator.next();
+ voteToHalt();
+ } else {
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+
+ /**
+ * set vertex state
+ */
+ public void setState() {
+ if (incomingMsg.getFlag() == Message.START) {
+ getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ getVertexValue().setState(MessageFlag.IS_TAIL);
+ getVertexValue().setMergeChain(getVertexValue().getMergeChain());
+ //voteToHalt();
+ } //else
+ //voteToHalt();
+ }
+
+ /**
+ * check if A need to be flipped with successor
+ */
+ public boolean ifFilpWithSuccessor(){
+ if(getVertexValue().getFRList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * check if A need to be filpped with predecessor
+ */
+ public boolean ifFlipWithPredecessor(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * set adjMessage to successor(from predecessor)
+ */
+ public void setSuccessorAdjMsg(){
+ if(getVertexValue().getFFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_FF;
+ else
+ outFlag |= MessageFlag.DIR_FR;
+ }
+
+ /**
+ * set adjMessage to predecessor(from successor)
+ */
+ public void setPredecessorAdjMsg(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_RF;
+ else
+ outFlag |= MessageFlag.DIR_RF;
+ }
+
+ /**
+ * send update message to neighber
+ * @throws IOException
+ */
+ public void broadcastUpdateMsg(){
+ switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromPredecessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHNEXT;
+ getVertexValue().setState(state);
+ if(getVertexValue().getFFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromSuccessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHPREV;
+ getVertexValue().setState(state);
+ if(getVertexValue().getRFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /**
+ * check if need filp
+ */
+ public byte flipDirection(byte neighborDir, boolean flip){
+ if(flip){
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+ } else
+ return neighborDir;
+ }
+
+ /**
+ * updateAdjList
+ */
+ public void processUpdate(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
+ }
+
+ /**
+ * merge and updateAdjList
+ */
+ public void processMerge(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
+ kmerSize, incomingMsg.getChainVertexId());
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if (getSuperstep() == 2)
+ initState(msgIterator);
+ else if (getSuperstep() % 4 == 3){
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
+ tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
+ outFlag = (byte) (headFlag | tailFlag);
+
+ // only PATH vertices are present. Find the ID's for my neighbors
+ curID.set(getVertexId());
+
+ curHead = isNodeRandomHead(curID);
+
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
+ hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ getVertexValue().setState(outFlag);
+ voteToHalt();
+ }
+ if (hasNext || hasPrev) {
+ if (curHead) {
+ if (hasNext && !nextHead) {
+ // compress this head to the forward tail
+ sendUpMsgFromPredecessor();
+ } else if (hasPrev && !prevHead) {
+ // compress this head to the reverse tail
+ sendUpMsgFromSuccessor();
+ }
+ } else {
+ // I'm a tail
+ if (hasNext && hasPrev) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ // tails on both sides, and I'm the "local minimum"
+ // compress me towards the tail in forward dir
+ sendUpMsgFromPredecessor();
+ }
+ } else if (!hasPrev) {
+ // no previous node
+ if (!nextHead && curID.compareTo(nextID) < 0) {
+ // merge towards tail in forward dir
+ sendUpMsgFromPredecessor();
+ }
+ } else if (!hasNext) {
+ // no next node
+ if (!prevHead && curID.compareTo(prevID) < 0) {
+ // merge towards tail in reverse dir
+ sendUpMsgFromSuccessor();
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(P5ForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P5ForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 61e7ae9..62f941f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -97,9 +97,9 @@
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfMergeChain() <= length){
if(getVertexValue().getFFList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
else if(getVertexValue().getFRList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -109,9 +109,9 @@
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfMergeChain() <= length){
if(getVertexValue().getRFList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
else if(getVertexValue().getRRList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -126,7 +126,7 @@
else if(getSuperstep() == 2){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ if(incomingMsg.getFlag() == AdjMessage.FROMFF){
//remove incomingMsg.getSourceId from RR positionList
iterator = getVertexValue().getRRList().iterator();
while(iterator.hasNext()){
@@ -136,7 +136,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMFR){
//remove incomingMsg.getSourceId from RF positionList
iterator = getVertexValue().getRFList().iterator();
while(iterator.hasNext()){
@@ -146,7 +146,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMRF){
//remove incomingMsg.getSourceId from FR positionList
iterator = getVertexValue().getFRList().iterator();
while(iterator.hasNext()){
@@ -156,7 +156,7 @@
break;
}
}
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ } else{ //incomingMsg.getFlag() == AdjMessage.FROMRR
//remove incomingMsg.getSourceId from FF positionList
iterator = getVertexValue().getFFList().iterator();
while(iterator.hasNext()){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index 0a5428b..90100e9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -1,13 +1,13 @@
package edu.uci.ics.genomix.pregelix.type;
-public class MessageFlag {
- public static final byte FLIP = 0;
- public static final byte IS_HEAD = 1 << 1;
- public static final byte IS_TAIL = 1 << 2;
- public static final byte SHOULD_MERGEWITHNEXT = 1 << 3;
- public static final byte SHOULD_MERGEWITHPREV = 1 << 4;
- public static final byte FROM_SUCCESSOR = 1 << 5;
- public static final byte FROM_PREDECESSOR = 1 << 6;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+
+public class MessageFlag extends DirectionFlag {
+ public static final byte FLIP = 1 << 2;
+ public static final byte IS_HEAD = 1 << 3;
+ public static final byte IS_TAIL = 1 << 4;
+ public static final byte SHOULD_MERGEWITHNEXT = 1 << 5;
+ public static final byte SHOULD_MERGEWITHPREV = 1 << 6;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
@@ -22,10 +22,6 @@
return "SHOULD_MERGEWITHPREV";
case FLIP:
return "FLIP";
- case FROM_SUCCESSOR:
- return "FROM_SUCCESSOR";
- case FROM_PREDECESSOR:
- return "FROM_PREDECESSOR";
}
return "ERROR_BAD_MESSAGE";
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 67c9792..a4aaa84 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,7 +1,9 @@
package edu.uci.ics.genomix.pregelix.util;
+import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
public class VertexUtil {
/**
@@ -18,7 +20,7 @@
*
* @param vertexValue
*/
- public static boolean isHeadVertex(VertexValueWritable value) {
+ public static boolean isHeadVertexWithIndegree(VertexValueWritable value) {
return value.outDegree() > 0 && !isPathVertex(value) && !isHeadWithoutIndegree(value);
}
@@ -27,7 +29,7 @@
*
* @param vertexValue
*/
- public static boolean isRearVertex(VertexValueWritable value) {
+ public static boolean isRearVertexWithOutdegree(VertexValueWritable value) {
return value.inDegree() > 0 && !isPathVertex(value) && !isRearWithoutOutdegree(value);
}
@@ -93,4 +95,14 @@
public static boolean isDownBridgeVertex(VertexValueWritable value){
return value.inDegree() > 1 && value.outDegree() == 1;
}
+
+ /**
+ * get nodeId from Ad
+ */
+ public static PositionWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
+ if(adj.getForwardList().getCountOfPosition() > 0)
+ return adj.getForwardList().getPosition(0);
+ else
+ return adj.getReverseList().getPosition(0);
+ }
}