code review on P4 refactor
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index d94cdfa..96ea216 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -171,7 +171,8 @@
BtoA &= DIR_MASK;
BtoC &= DIR_MASK;
byte AtoB = mirrorEdge(BtoA);
- // two rules apply:
+ // a valid path must exist from A to C
+ // specifically, two rules apply for AtoB and BtoC
// 1) the internal letters must be the same (so FF, RF will be an error)
// 2) the final direction is the 1st letter of AtoB + 2nd letter of BtoC
// TODO? maybe we could use the string version to resolve this following above rules
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 085c825..d24d208 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -30,7 +30,7 @@
public class P4ForPathMergeVertex extends
BasicPathMergeVertex<VertexValueWritable, PathMergeMessageWritable> {
- private static final Logger LOG = Logger.getLogger(P4ForPathMergeVertex.class.getName());
+// private static final Logger LOG = Logger.getLogger(P4ForPathMergeVertex.class.getName());
private static long randSeed = 1; //static for save memory
private float probBeingRandomHead = -1;
@@ -91,18 +91,32 @@
public void restrictNeighbors() {
EnumSet<DIR> dirsToRestrict;
VertexValueWritable vertex = getVertexValue();
+ short state = vertex.getState();
+ boolean updated = false;
if(isTandemRepeat(vertex)) {
// tandem repeats are not allowed to merge at all
dirsToRestrict = EnumSet.of(DIR.NEXT, DIR.PREVIOUS);
+ state |= DIR.NEXT.get() | DIR.PREVIOUS.get(); // tandem repeats are excluded from any merging
+ updated = true;
}
else {
// degree > 1 can't merge in that direction
dirsToRestrict = EnumSet.noneOf(DIR.class);
for (DIR dir : DIR.values()) {
- if (vertex.getDegree(dir) > 1)
+ if (vertex.getDegree(dir) > 1 || vertex.getDegree(dir) == 0) {
dirsToRestrict.add(dir);
+ state |= dir.get();
+ updated = true;
+ }
}
}
+ if (updated) {
+ vertex.setState(state);
+ if (DIR.enumSetFromByte(state).containsAll(EnumSet.allOf(DIR.class)))
+ voteToHalt();
+ else
+ activate();
+ }
// send a message to each neighbor indicating they can't merge towards me
for (DIR dir : dirsToRestrict) {
@@ -111,7 +125,7 @@
outgoingMsg.reset();
// outgoingMsg.setFlag(dir.mirror().get());
outgoingMsg.setFlag(DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)).get());
- LOG.info("send restriction from " + getVertexId() + " to " + destId + " in my " + d + " and their " + DirectionFlag.mirrorEdge(d) + " (" + DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)) + "); I am " + getVertexValue());
+// LOG.info("send restriction from " + getVertexId() + " to " + destId + " in my " + d + " and their " + DirectionFlag.mirrorEdge(d) + " (" + DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)) + "); I am " + getVertexValue());
sendMsg(destId, outgoingMsg);
}
}
@@ -122,19 +136,19 @@
* initiate head, rear and path node
*/
public void recieveRestrictions(Iterator<PathMergeMessageWritable> msgIterator) {
- short restrictedDirs = 0;
+ short restrictedDirs = 0; // the directions (NEXT/PREVIOUS) that I'm not allowed to merge in
while (msgIterator.hasNext()) {
- LOG.info("before restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
+// LOG.info("before restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
incomingMsg = msgIterator.next();
restrictedDirs |= incomingMsg.getFlag();
- LOG.info("after restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
+// LOG.info("after restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
activate();
}
// special case: tandem repeats cannot merge at all
if (isTandemRepeat(getVertexValue())) {
restrictedDirs |= DIR.PREVIOUS.get();
restrictedDirs |= DIR.NEXT.get();
- LOG.info("after tandem repeat restriction: " + DIR.fromByte(restrictedDirs));
+// LOG.info("after tandem repeat restriction: " + DIR.fromByte(restrictedDirs));
voteToHalt();
}
getVertexValue().setState(restrictedDirs);
@@ -157,7 +171,7 @@
VertexValueWritable vertex = getVertexValue();
EnumSet<DIR> restrictedDirs = DIR.enumSetFromByte(vertex.getState());
// NEXT restricted by neighbor or by my edges?
- if (restrictedDirs.contains(DIR.NEXT) || vertex.outDegree() != 1) {
+ if (restrictedDirs.contains(DIR.NEXT) || vertex.outDegree() != 1) { // TODO should I restrict based on degree in the first iteration?
hasNext = false;
} else {
hasNext = true;
@@ -220,11 +234,11 @@
}
}
}
- if ((getVertexValue().getState() & P4State.MERGE) == 0) {
- LOG.info("No merge for " + getVertexId());
- } else {
- LOG.info("Merge from " + getVertexId() + " towards " + (getVertexValue().getState() & DirectionFlag.DIR_MASK) + "; node is " + getVertexValue());
- }
+// if ((getVertexValue().getState() & P4State.MERGE) == 0) {
+// LOG.info("No merge for " + getVertexId());
+// } else {
+// LOG.info("Merge from " + getVertexId() + " towards " + (getVertexValue().getState() & DirectionFlag.DIR_MASK) + "; node is " + getVertexValue());
+// }
}
public void updateNeighbors() {
@@ -238,7 +252,7 @@
byte[] mergeEdges = NodeWritable.edgeTypesInDir(mergeDir);
DIR updateDir = mergeDir.mirror();
- byte[] updateEdges = NodeWritable.edgeTypesInDir(updateDir);
+ byte[] updateEdges = NodeWritable.edgeTypesInDir(updateDir); //
// prepare the update message s.t. the receiver can do a simple unionupdate
// that means we figure out any hops and place our merge-dir edges in the appropriate list of the outgoing msg
@@ -253,7 +267,7 @@
// send the update to all kmers in this list // TODO perhaps we could skip all this if there are no neighbors here
for (VKmerBytesWritable dest : vertex.getEdgeList(updateEdge).getKeys()) {
- LOG.info("send update message from " + getVertexId() + " to " + dest + ": " + outgoingMsg);
+// LOG.info("send update message from " + getVertexId() + " to " + dest + ": " + outgoingMsg);
sendMsg(dest, outgoingMsg);
}
}
@@ -263,7 +277,7 @@
VertexValueWritable vertex = getVertexValue();
NodeWritable node = vertex.getNode();
while (msgIterator.hasNext()) {
- LOG.info("before update from neighbor: " + getVertexValue());
+// LOG.info("before update from neighbor: " + getVertexValue());
incomingMsg = msgIterator.next();
// remove the edge to the node that will merge elsewhere
node.getEdgeList((byte)(incomingMsg.getFlag() & DirectionFlag.DIR_MASK)).remove(incomingMsg.getSourceVertexId());
@@ -271,7 +285,7 @@
for (byte dir : DirectionFlag.values) {
node.getEdgeList(dir).unionUpdate(incomingMsg.getEdgeList(dir));
}
- LOG.info("after update from neighbor: " + getVertexValue());
+// LOG.info("after update from neighbor: " + getVertexValue());
}
checkNeighbors();
if (!hasNext && !hasPrev)
@@ -295,10 +309,10 @@
if (vertex.getDegree(DirectionFlag.dirFromEdgeType(mergeDir)) != 1)
throw new IllegalStateException("Merge attempted in node with degree in " + mergeDir + " direction != 1!\n" + vertex);
VKmerBytesWritable dest = vertex.getEdgeList(mergeDir).get(0).getKey();
- LOG.info("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg + "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState()) + ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
+// LOG.info("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg + "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState()) + ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
sendMsg(dest, outgoingMsg);
- LOG.info("killing self: " + getVertexId());
+// LOG.info("killing self: " + getVertexId());
deleteVertex(getVertexId());
}
}
@@ -313,13 +327,13 @@
byte senderDir;
int numMerged = 0;
while (msgIterator.hasNext()) {
- LOG.info("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
+// LOG.info("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
incomingMsg = msgIterator.next();
senderDir = (byte) (incomingMsg.getFlag() & DirectionFlag.DIR_MASK);
node.mergeWithNode(senderDir, incomingMsg.getNode());
state |= (byte) (incomingMsg.getFlag() & DIR.MASK); // update incoming restricted directions
numMerged++;
- LOG.info("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
+// LOG.info("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
}
if(isTandemRepeat(getVertexValue())) {
state |= (DIR.NEXT.get() | DIR.PREVIOUS.get()); // tandem repeats can't merge anymore
@@ -333,9 +347,9 @@
@Override
public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
initVertex();
- if (getSuperstep() >= 4) {
- LOG.info("test");
- }
+// if (getSuperstep() >= 4) {
+// LOG.info("test");
+// }
if (getSuperstep() == 1) {
restrictNeighbors();