add option for verbose and log (one of) the problem kmer(s)
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 4529d4a..1c0a023 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -21,8 +21,10 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.util.AbstractMap;
import java.util.Comparator;
import java.util.EnumSet;
+import java.util.Map;
import org.apache.hadoop.io.WritableComparable;
@@ -739,6 +741,19 @@
break;
}
}
+
+ /**
+ * Debug helper function to find the edge associated with the given kmer
+ */
+ public Map.Entry<Byte, EdgeWritable> findEdge(final VKmerBytesWritable kmer) {
+ for (byte dir : DirectionFlag.values) {
+ for (EdgeWritable e : edges[dir]) {
+ if (e.getKey().equals(kmer))
+ return new AbstractMap.SimpleEntry<Byte, EdgeWritable>(dir, e);
+ }
+ }
+ return null;
+ }
public int inDegree() {
return edges[DirectionFlag.DIR_RR].getCountOfPosition() + edges[DirectionFlag.DIR_RF].getCountOfPosition();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index d71bfa3..c3585f4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,10 +1,14 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Random;
import java.util.logging.Logger;
+import org.apache.commons.lang3.StringUtils;
+import org.jfree.util.Log;
+
import edu.uci.ics.genomix.config.GenomixJobConf;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
@@ -21,6 +25,7 @@
import edu.uci.ics.genomix.type.NodeWritable.IncomingListFlag;
import edu.uci.ics.genomix.type.NodeWritable.OutgoingListFlag;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
* Graph clean pattern: P4(Smart-algorithm) for path merge
@@ -30,7 +35,7 @@
public class P4ForPathMergeVertex extends
BasicPathMergeVertex<VertexValueWritable, PathMergeMessageWritable> {
-// private static final Logger LOG = Logger.getLogger(P4ForPathMergeVertex.class.getName());
+ private static final Logger LOG = Logger.getLogger(P4ForPathMergeVertex.class.getName());
private static long randSeed = 1; //static for save memory
private float probBeingRandomHead = -1;
@@ -47,6 +52,9 @@
private byte nextDir;
private byte prevDir;
+ private static final VKmerBytesWritable problemKmer = new VKmerBytesWritable("CCCGGCCTCCAGCGTGGGATACGCGAAGATGCCGCCGTAGGTGAGAATCTGGTTC");
+ private boolean verbose;
+
/**
* initiate kmerSize, maxIteration
*/
@@ -83,6 +91,8 @@
// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
+
+ verbose = getVertexValue().getNode().findEdge(problemKmer) != null || getVertexId().equals(problemKmer);
}
/**
@@ -123,10 +133,11 @@
for (DIR dir : dirsToRestrict) {
for (byte d : NodeWritable.edgeTypesInDir(dir)) {
for (VKmerBytesWritable destId : vertex.getEdgeList(d).getKeys()) {
+ verbose |= destId.equals(problemKmer);
outgoingMsg.reset();
-// outgoingMsg.setFlag(dir.mirror().get());
outgoingMsg.setFlag(DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)).get());
-// LOG.info("send restriction from " + getVertexId() + " to " + destId + " in my " + d + " and their " + DirectionFlag.mirrorEdge(d) + " (" + DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)) + "); I am " + getVertexValue());
+ if (verbose)
+ LOG.fine("send restriction from " + getVertexId() + " to " + destId + " in my " + d + " and their " + DirectionFlag.mirrorEdge(d) + " (" + DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)) + "); I am " + getVertexValue());
sendMsg(destId, outgoingMsg);
}
}
@@ -140,10 +151,13 @@
short restrictedDirs = 0; // the directions (NEXT/PREVIOUS) that I'm not allowed to merge in
boolean updated = false;
while (msgIterator.hasNext()) {
-// LOG.info("before restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
incomingMsg = msgIterator.next();
+ verbose |= incomingMsg.getNode().findEdge(problemKmer) != null || incomingMsg.getSourceVertexId().equals(problemKmer);
+ if (verbose)
+ LOG.fine("before restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
restrictedDirs |= incomingMsg.getFlag();
-// LOG.info("after restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
+ if (verbose)
+ LOG.fine("after restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
updated = true;
}
if (updated) {
@@ -161,7 +175,8 @@
for(int i = 0; i < 500; i++)
randGenerator.nextFloat();
boolean isHead = randGenerator.nextFloat() < probBeingRandomHead;
-// LOG.info("randomHead: " + nodeKmer + "=" + isHead);
+ if (verbose)
+ LOG.fine("randomHead: " + nodeKmer + "=" + isHead);
return isHead;
}
@@ -235,11 +250,13 @@
}
}
}
-// if ((getVertexValue().getState() & P4State.MERGE) == 0) {
-// LOG.info("No merge for " + getVertexId());
-// } else {
-// LOG.info("Merge from " + getVertexId() + " towards " + (getVertexValue().getState() & DirectionFlag.DIR_MASK) + "; node is " + getVertexValue());
-// }
+ if (verbose) {
+ if ((getVertexValue().getState() & P4State.MERGE) == 0) {
+ LOG.fine("No merge for " + getVertexId());
+ } else {
+ LOG.fine("Merge from " + getVertexId() + " towards " + (getVertexValue().getState() & DirectionFlag.DIR_MASK) + "; node is " + getVertexValue());
+ }
+ }
}
public void updateNeighbors() {
@@ -265,31 +282,44 @@
byte newDir = DirectionFlag.resolveLinkThroughMiddleNode(updateEdge, mergeEdge);
outgoingMsg.getNode().setEdgeList(newDir, getVertexValue().getEdgeList(mergeEdge)); // copy into outgoingMsg
}
+ verbose |= outgoingMsg.getNode().findEdge(problemKmer) != null;
// send the update to all kmers in this list // TODO perhaps we could skip all this if there are no neighbors here
for (VKmerBytesWritable dest : vertex.getEdgeList(updateEdge).getKeys()) {
-// LOG.info("send update message from " + getVertexId() + " to " + dest + ": " + outgoingMsg);
+ if (verbose)
+ LOG.fine("send update message from " + getVertexId() + " to " + dest + ": " + outgoingMsg);
sendMsg(dest, outgoingMsg);
}
}
}
- public void receiveUpdates(Iterator<PathMergeMessageWritable> msgIterator){
+ public void receiveUpdates(Iterator<PathMergeMessageWritable> msgIterator) throws HyracksDataException{
VertexValueWritable vertex = getVertexValue();
NodeWritable node = vertex.getNode();
boolean updated = false;
+ ArrayList<PathMergeMessageWritable> allSeenMsgs = new ArrayList<PathMergeMessageWritable>();
while (msgIterator.hasNext()) {
-// LOG.info("before update from neighbor: " + getVertexValue());
incomingMsg = msgIterator.next();
+ verbose |= incomingMsg.getNode().findEdge(problemKmer) != null || incomingMsg.getSourceVertexId().equals(problemKmer);
+ if (verbose)
+ LOG.fine("before update from neighbor: " + getVertexValue());
// remove the edge to the node that will merge elsewhere
- node.getEdgeList((byte)(incomingMsg.getFlag() & DirectionFlag.DIR_MASK)).remove(incomingMsg.getSourceVertexId());
+ try {
+ node.getEdgeList((byte)(incomingMsg.getFlag() & DirectionFlag.DIR_MASK)).remove(incomingMsg.getSourceVertexId());
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new HyracksDataException("In update, tried to remove an edge that doesn't exist...\nvertex: " + vertex + "\nremoving " + incomingMsg.getSourceVertexId() + " from dir " + (incomingMsg.getFlag() & DirectionFlag.DIR_MASK) + "\nupdate node: " + incomingMsg.getNode() + "\npreviously recieved messages this iteration: \n{\n" + StringUtils.join(allSeenMsgs, "\n") + "\n}\n", e);
+ }
// add the node this neighbor will merge into
for (byte dir : DirectionFlag.values) {
node.getEdgeList(dir).unionUpdate(incomingMsg.getEdgeList(dir));
}
updated = true;
-// LOG.info("after update from neighbor: " + getVertexValue());
+ allSeenMsgs.add(incomingMsg);
+ if (verbose)
+ LOG.fine("after update from neighbor: " + getVertexValue());
}
+ if (verbose)
+ LOG.fine("All recieved updates: \n{\n" + StringUtils.join(allSeenMsgs, "\n") + "\n}\n");
if (updated) {
if (DIR.enumSetFromByte(vertex.getState()).containsAll(EnumSet.allOf(DIR.class)))
voteToHalt();
@@ -319,10 +349,13 @@
if (vertex.getDegree(DirectionFlag.dirFromEdgeType(mergeDir)) != 1)
throw new IllegalStateException("Merge attempted in node with degree in " + mergeDir + " direction != 1!\n" + vertex);
VKmerBytesWritable dest = vertex.getEdgeList(mergeDir).get(0).getKey();
-// LOG.info("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg + "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState()) + ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
+ verbose |= outgoingMsg.getNode().findEdge(problemKmer) != null || dest.equals(problemKmer);
+ if (verbose)
+ LOG.fine("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg + "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState()) + ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
sendMsg(dest, outgoingMsg);
-// LOG.info("killing self: " + getVertexId());
+ if (verbose)
+ LOG.fine("killing self: " + getVertexId());
deleteVertex(getVertexId());
}
}
@@ -338,20 +371,25 @@
byte senderDir;
int numMerged = 0;
while (msgIterator.hasNext()) {
-// LOG.info("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
incomingMsg = msgIterator.next();
+ verbose |= incomingMsg.getNode().findEdge(problemKmer) != null;
+ if (verbose)
+ LOG.fine("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
senderDir = (byte) (incomingMsg.getFlag() & DirectionFlag.DIR_MASK);
node.mergeWithNode(senderDir, incomingMsg.getNode());
state |= (byte) (incomingMsg.getFlag() & DIR.MASK); // update incoming restricted directions
numMerged++;
updated = true;
-// LOG.info("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
+ if (verbose)
+ LOG.fine("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
}
if(isTandemRepeat(getVertexValue())) {
// tandem repeats can't merge anymore; restrict all future merges
state |= DIR.NEXT.get();
state |= DIR.PREVIOUS.get();
updated = true;
+ if (verbose)
+ LOG.fine("recieveMerges is a tandem repeat: " + getVertexId() + " " + getVertexValue());
// updateStatisticsCounter(StatisticsCounter.Num_Cycles);
}
// updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
@@ -366,11 +404,8 @@
}
@Override
- public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
+ public void compute(Iterator<PathMergeMessageWritable> msgIterator) throws HyracksDataException {
initVertex();
-// if (getSuperstep() >= 4) {
-// LOG.info("test");
-// }
if (getSuperstep() == 1) {
restrictNeighbors();