add option for verbose and log (one of) the problem kmer(s)
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 4529d4a..1c0a023 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -21,8 +21,10 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.AbstractMap;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.Map;
 
 import org.apache.hadoop.io.WritableComparable;
 
@@ -739,6 +741,19 @@
                 break;
         }
     }
+    
+    /**
+     * Debug helper function to find the edge associated with the given kmer
+     */
+    public Map.Entry<Byte, EdgeWritable> findEdge(final VKmerBytesWritable kmer) {
+        for (byte dir : DirectionFlag.values) {
+            for (EdgeWritable e : edges[dir]) {
+                if (e.getKey().equals(kmer))
+                    return new AbstractMap.SimpleEntry<Byte, EdgeWritable>(dir, e);
+            }
+        }
+        return null;
+    }
 
     public int inDegree() {
         return edges[DirectionFlag.DIR_RR].getCountOfPosition() + edges[DirectionFlag.DIR_RF].getCountOfPosition();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index d71bfa3..c3585f4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,10 +1,14 @@
 package edu.uci.ics.genomix.pregelix.operator.pathmerge;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Random;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.StringUtils;
+import org.jfree.util.Log;
+
 import edu.uci.ics.genomix.config.GenomixJobConf;
 import edu.uci.ics.genomix.pregelix.client.Client;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
@@ -21,6 +25,7 @@
 import edu.uci.ics.genomix.type.NodeWritable.IncomingListFlag;
 import edu.uci.ics.genomix.type.NodeWritable.OutgoingListFlag;
 import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Graph clean pattern: P4(Smart-algorithm) for path merge 
@@ -30,7 +35,7 @@
 public class P4ForPathMergeVertex extends
     BasicPathMergeVertex<VertexValueWritable, PathMergeMessageWritable> {
     
-//    private static final Logger LOG = Logger.getLogger(P4ForPathMergeVertex.class.getName());
+    private static final Logger LOG = Logger.getLogger(P4ForPathMergeVertex.class.getName());
     
     private static long randSeed = 1; //static for save memory
     private float probBeingRandomHead = -1;
@@ -47,6 +52,9 @@
     private byte nextDir;
     private byte prevDir;
     
+    private static final VKmerBytesWritable problemKmer = new VKmerBytesWritable("CCCGGCCTCCAGCGTGGGATACGCGAAGATGCCGCCGTAGGTGAGAATCTGGTTC");
+    private boolean verbose;
+    
     /**
      * initiate kmerSize, maxIteration
      */
@@ -83,6 +91,8 @@
 //            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
+        
+        verbose = getVertexValue().getNode().findEdge(problemKmer) != null || getVertexId().equals(problemKmer);
     }
     
     /**
@@ -123,10 +133,11 @@
         for (DIR dir : dirsToRestrict) {
             for (byte d : NodeWritable.edgeTypesInDir(dir)) {
                 for (VKmerBytesWritable destId : vertex.getEdgeList(d).getKeys()) {
+                    verbose |= destId.equals(problemKmer);
                     outgoingMsg.reset();
-//                    outgoingMsg.setFlag(dir.mirror().get());
                     outgoingMsg.setFlag(DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)).get());
-//                    LOG.info("send restriction from " + getVertexId() + " to " + destId + " in my " + d + " and their " + DirectionFlag.mirrorEdge(d) + " (" + DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)) + "); I am " + getVertexValue());
+                    if (verbose)
+                        LOG.fine("send restriction from " + getVertexId() + " to " + destId + " in my " + d + " and their " + DirectionFlag.mirrorEdge(d) + " (" + DirectionFlag.dirFromEdgeType(DirectionFlag.mirrorEdge(d)) + "); I am " + getVertexValue());
                     sendMsg(destId, outgoingMsg);
                 }
             }
@@ -140,10 +151,13 @@
         short restrictedDirs = 0;  // the directions (NEXT/PREVIOUS) that I'm not allowed to merge in
         boolean updated = false;
         while (msgIterator.hasNext()) {
-//            LOG.info("before restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
             incomingMsg = msgIterator.next();
+            verbose |= incomingMsg.getNode().findEdge(problemKmer) != null || incomingMsg.getSourceVertexId().equals(problemKmer);
+            if (verbose)
+                LOG.fine("before restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
             restrictedDirs |= incomingMsg.getFlag();
-//            LOG.info("after restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
+            if (verbose)
+                LOG.fine("after restriction " + getVertexId() + ": " + DIR.fromByte(restrictedDirs));
             updated = true;
         }
         if (updated) {
@@ -161,7 +175,8 @@
         for(int i = 0; i < 500; i++)
             randGenerator.nextFloat();
         boolean isHead = randGenerator.nextFloat() < probBeingRandomHead;
-//        LOG.info("randomHead: " + nodeKmer + "=" + isHead);
+        if (verbose)
+            LOG.fine("randomHead: " + nodeKmer + "=" + isHead);
         return isHead;
     }
     
@@ -235,11 +250,13 @@
                 }
             }
         }
-//        if ((getVertexValue().getState() & P4State.MERGE) == 0) {
-//            LOG.info("No merge for " + getVertexId());
-//        } else {
-//            LOG.info("Merge from " + getVertexId() + " towards " + (getVertexValue().getState() & DirectionFlag.DIR_MASK) + "; node is " + getVertexValue());
-//        }
+        if (verbose) {
+            if ((getVertexValue().getState() & P4State.MERGE) == 0) {
+                LOG.fine("No merge for " + getVertexId());
+            } else {
+                LOG.fine("Merge from " + getVertexId() + " towards " + (getVertexValue().getState() & DirectionFlag.DIR_MASK) + "; node is " + getVertexValue());
+            }
+        }
     }
     
     public void updateNeighbors() {
@@ -265,31 +282,44 @@
                 byte newDir = DirectionFlag.resolveLinkThroughMiddleNode(updateEdge, mergeEdge);
                 outgoingMsg.getNode().setEdgeList(newDir, getVertexValue().getEdgeList(mergeEdge));  // copy into outgoingMsg
             }
+            verbose |= outgoingMsg.getNode().findEdge(problemKmer) != null;
             
             // send the update to all kmers in this list // TODO perhaps we could skip all this if there are no neighbors here
             for (VKmerBytesWritable dest : vertex.getEdgeList(updateEdge).getKeys()) {
-//                LOG.info("send update message from " + getVertexId() + " to " + dest + ": " + outgoingMsg);
+                if (verbose)
+                    LOG.fine("send update message from " + getVertexId() + " to " + dest + ": " + outgoingMsg);
                 sendMsg(dest, outgoingMsg);
             }
         }
     }
     
-    public void receiveUpdates(Iterator<PathMergeMessageWritable> msgIterator){
+    public void receiveUpdates(Iterator<PathMergeMessageWritable> msgIterator) throws HyracksDataException{
         VertexValueWritable vertex = getVertexValue(); 
         NodeWritable node = vertex.getNode();
         boolean updated = false;
+        ArrayList<PathMergeMessageWritable> allSeenMsgs = new ArrayList<PathMergeMessageWritable>();
         while (msgIterator.hasNext()) {
-//            LOG.info("before update from neighbor: " + getVertexValue());
             incomingMsg = msgIterator.next();
+            verbose |= incomingMsg.getNode().findEdge(problemKmer) != null || incomingMsg.getSourceVertexId().equals(problemKmer);
+            if (verbose)
+                LOG.fine("before update from neighbor: " + getVertexValue());
             // remove the edge to the node that will merge elsewhere
-            node.getEdgeList((byte)(incomingMsg.getFlag() & DirectionFlag.DIR_MASK)).remove(incomingMsg.getSourceVertexId());
+            try {
+                node.getEdgeList((byte)(incomingMsg.getFlag() & DirectionFlag.DIR_MASK)).remove(incomingMsg.getSourceVertexId());
+            } catch (ArrayIndexOutOfBoundsException e) {
+                throw new HyracksDataException("In update, tried to remove an edge that doesn't exist...\nvertex: " + vertex + "\nremoving " + incomingMsg.getSourceVertexId() + " from dir " + (incomingMsg.getFlag() & DirectionFlag.DIR_MASK) + "\nupdate node: " + incomingMsg.getNode() + "\npreviously recieved messages this iteration: \n{\n" + StringUtils.join(allSeenMsgs, "\n") + "\n}\n", e);
+            }
             // add the node this neighbor will merge into
             for (byte dir : DirectionFlag.values) {
                 node.getEdgeList(dir).unionUpdate(incomingMsg.getEdgeList(dir));
             }
             updated = true;
-//            LOG.info("after update from neighbor: " + getVertexValue());
+            allSeenMsgs.add(incomingMsg);
+            if (verbose) 
+                LOG.fine("after update from neighbor: " + getVertexValue());
         }
+        if (verbose)
+            LOG.fine("All recieved updates:  \n{\n" + StringUtils.join(allSeenMsgs, "\n") + "\n}\n");
         if (updated) {
             if (DIR.enumSetFromByte(vertex.getState()).containsAll(EnumSet.allOf(DIR.class)))
                 voteToHalt();
@@ -319,10 +349,13 @@
             if (vertex.getDegree(DirectionFlag.dirFromEdgeType(mergeDir)) != 1)
                 throw new IllegalStateException("Merge attempted in node with degree in " + mergeDir + " direction != 1!\n" + vertex);
             VKmerBytesWritable dest = vertex.getEdgeList(mergeDir).get(0).getKey();
-//            LOG.info("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg + "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState()) + ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
+            verbose |= outgoingMsg.getNode().findEdge(problemKmer) != null || dest.equals(problemKmer);
+            if (verbose)
+                LOG.fine("send merge mesage from " + getVertexId() + " to " + dest + ": " + outgoingMsg + "; my restrictions are: " + DIR.enumSetFromByte(vertex.getState()) + ", their restrictions are: " + DIR.enumSetFromByte(outgoingMsg.getFlag()));
             sendMsg(dest, outgoingMsg);
             
-//            LOG.info("killing self: " + getVertexId());
+            if (verbose)
+                LOG.fine("killing self: " + getVertexId());
             deleteVertex(getVertexId());
         }
     }
@@ -338,20 +371,25 @@
         byte senderDir;
         int numMerged = 0;
         while (msgIterator.hasNext()) {
-//            LOG.info("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
             incomingMsg = msgIterator.next();
+            verbose |= incomingMsg.getNode().findEdge(problemKmer) != null;
+            if (verbose)
+                LOG.fine("before merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
             senderDir = (byte) (incomingMsg.getFlag() & DirectionFlag.DIR_MASK);
             node.mergeWithNode(senderDir, incomingMsg.getNode());
             state |= (byte) (incomingMsg.getFlag() & DIR.MASK);  // update incoming restricted directions
             numMerged++;
             updated = true;
-//            LOG.info("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
+            if (verbose)
+                LOG.fine("after merge: " + getVertexValue() + " restrictions: " + DIR.enumSetFromByte(state));
         }
         if(isTandemRepeat(getVertexValue())) {
             // tandem repeats can't merge anymore; restrict all future merges
             state |= DIR.NEXT.get();
             state |= DIR.PREVIOUS.get();
             updated = true;
+            if (verbose)
+                LOG.fine("recieveMerges is a tandem repeat: " + getVertexId() + " " + getVertexValue());
 //          updateStatisticsCounter(StatisticsCounter.Num_Cycles); 
         }
 //      updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
@@ -366,11 +404,8 @@
     }
     
     @Override
-    public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
+    public void compute(Iterator<PathMergeMessageWritable> msgIterator) throws HyracksDataException {
         initVertex();
-//        if (getSuperstep() >= 4) {
-//            LOG.info("test");
-//        }
             
         if (getSuperstep() == 1) {
             restrictNeighbors();