Extract common method for P1 and P4
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 2b8434d..08d0a8a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -576,58 +576,13 @@
+ " direction != 1!\n" + vertex);
VKmerBytesWritable dest = vertex.getEdgeList(mergeEdgetype).get(0).getKey();
sendMsg(dest, outgoingMsg);
- deleteVertex(getVertexId());
if (verbose) {
LOG.fine("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg
+ "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState())
+ ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
- LOG.fine("killing self: " + getVertexId());
}
}
}
- /**
- * step4: receive and process Merges
- */
- public void receiveMerges(Iterator<M> msgIterator) {
- VertexValueWritable vertex = getVertexValue();
- NodeWritable node = vertex.getNode();
- short state = vertex.getState();
- boolean updated = false;
- EDGETYPE senderEdgetype;
- @SuppressWarnings("unused")
- int numMerged = 0;
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if (verbose)
- LOG.fine("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
- senderEdgetype = EDGETYPE.fromByte(incomingMsg.getFlag());
- node.mergeWithNode(senderEdgetype, incomingMsg.getNode());
- state |= (byte) (incomingMsg.getFlag() & DIR.MASK); // update incoming restricted directions
- numMerged++;
- updated = true;
- if (verbose)
- LOG.fine("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
- }
- if (isTandemRepeat(getVertexValue())) {
- // tandem repeats can't merge anymore; restrict all future merges
- state |= DIR.NEXT.get();
- state |= DIR.PREVIOUS.get();
- updated = true;
- if (verbose)
- LOG.fine("recieveMerges is a tandem repeat: " + getVertexId() + " " + getVertexValue());
- // updateStatisticsCounter(StatisticsCounter.Num_Cycles);
- }
- // updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
- // getVertexValue().setCounters(counters);
- if (updated) {
- vertex.setState(state);
- if (DIR.enumSetFromByte(state).containsAll(EnumSet.allOf(DIR.class)))
- voteToHalt();
- else
- activate();
- }
- }
-
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index b103174..4fe890e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -230,33 +230,14 @@
/**
* for P1
*/
+ @Override
public void sendMergeMsg() {
- VertexValueWritable vertex = getVertexValue();
- short state = vertex.getState();
- if ((state & P4State.MERGE) != 0) {
- outgoingMsg.reset();
- // tell neighbor where this is coming from (so they can merge kmers and delete)
- EDGETYPE mergeEdgetype = EDGETYPE.fromByte(vertex.getState());
- byte neighborRestrictions = DIR.fromSet(mergeEdgetype.causesFlip() ? DIR.flipSetFromByte(state) : DIR.enumSetFromByte(state));
-
- outgoingMsg.setFlag((short) (mergeEdgetype.mirror().get() | neighborRestrictions));
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setNode(vertex.getNode());
- if (vertex.getDegree(mergeEdgetype.dir()) != 1)
- throw new IllegalStateException("Merge attempted in node with degree in " + mergeEdgetype
- + " direction != 1!\n" + vertex);
- VKmerBytesWritable dest = vertex.getEdgeList(mergeEdgetype).get(0).getKey();
- sendMsg(dest, outgoingMsg);
-
- if (verbose) {
- LOG.fine("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg
- + "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState())
- + ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
- }
-
+ super.sendMergeMsg();
+ short state = getVertexValue().getState();
+ if ((getVertexValue().getState() & P4State.MERGE) != 0) {
// set flag to NO_MERGE instead of deleteVertex
state |= P4State.NO_MERGE;
- vertex.setState(state);
+ getVertexValue().setState(state);
voteToHalt();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 81bc71e..9223bec 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -13,6 +13,7 @@
import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
import edu.uci.ics.genomix.type.NodeWritable.DIR;
import edu.uci.ics.genomix.type.NodeWritable.EDGETYPE;
+import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -164,6 +165,61 @@
}
}
+ /**
+ * for P4
+ */
+ @Override
+ public void sendMergeMsg(){
+ super.sendMergeMsg();
+ if ((getVertexValue().getState() & P4State.MERGE) != 0) {
+ deleteVertex(getVertexId());
+ LOG.fine("killing self: " + getVertexId());
+ }
+ }
+
+ /**
+ * step4: receive and process Merges
+ */
+ public void receiveMerges(Iterator<PathMergeMessageWritable> msgIterator) {
+ VertexValueWritable vertex = getVertexValue();
+ NodeWritable node = vertex.getNode();
+ short state = vertex.getState();
+ boolean updated = false;
+ EDGETYPE senderEdgetype;
+ @SuppressWarnings("unused")
+ int numMerged = 0;
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if (verbose)
+ LOG.fine("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
+ senderEdgetype = EDGETYPE.fromByte(incomingMsg.getFlag());
+ node.mergeWithNode(senderEdgetype, incomingMsg.getNode());
+ state |= (byte) (incomingMsg.getFlag() & DIR.MASK); // update incoming restricted directions
+ numMerged++;
+ updated = true;
+ if (verbose)
+ LOG.fine("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
+ }
+ if (isTandemRepeat(getVertexValue())) {
+ // tandem repeats can't merge anymore; restrict all future merges
+ state |= DIR.NEXT.get();
+ state |= DIR.PREVIOUS.get();
+ updated = true;
+ if (verbose)
+ LOG.fine("recieveMerges is a tandem repeat: " + getVertexId() + " " + getVertexValue());
+ // updateStatisticsCounter(StatisticsCounter.Num_Cycles);
+ }
+ // updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
+ // getVertexValue().setCounters(counters);
+ if (updated) {
+ vertex.setState(state);
+ if (DIR.enumSetFromByte(state).containsAll(EnumSet.allOf(DIR.class)))
+ voteToHalt();
+ else
+ activate();
+ }
+ }
+
@Override
public void compute(Iterator<PathMergeMessageWritable> msgIterator) throws HyracksDataException {
initVertex();
diff --git a/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt b/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt
index 22e8d8d..889338b 100644
--- a/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt
+++ b/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt
@@ -1,2 +1,2 @@
-P1ForMergeGraph.xml
P4ForMergeGraph.xml
+P1ForMergeGraph.xml