path node works for complex starts
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 063c5f7..2ced8dd 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -59,6 +60,7 @@
         private byte outFlag;
 
         public void configure(JobConf conf) {
+            
             randSeed = conf.getLong("randomSeed", 0);
             randGenerator = new Random(randSeed);
             probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
@@ -116,24 +118,35 @@
         public void map(PositionWritable key, MessageWritableNodeWithFlag value,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
                 throws IOException {
+            // Node may be marked as head b/c it's a real head or a real tail
+            headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+            tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
+            outFlag = (byte) (headFlag | tailFlag);
+            
             // only PATH vertices are present. Find the ID's for my neighbors
             curNode.set(value.getNode());
             curID.set(curNode.getNodeID());
+            
             curHead = isNodeRandomHead(curID);
-            hasNext = setNextInfo(curNode);
-            hasPrev = setPrevInfo(curNode);
+            // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
+            // We prevent merging towards non-path nodes
+            hasNext = setNextInfo(curNode) && tailFlag == 0;
+            hasPrev = setPrevInfo(curNode) && headFlag == 0;
             willMerge = false;
             
             reporter.setStatus("CHECK ME OUT");
-            System.out.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
+            System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
 
             // TODO: need to update edges in neighboring nodes
-
+            
+            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+                // true HEAD met true TAIL. this path is complete
+                outFlag |= MessageFlag.FROM_SELF;
+                outputValue.set(outFlag, curNode);
+                output.collect(curID, outputValue);
+                return;
+            }
             if (hasNext || hasPrev) {
-                // Node may be marked as head b/c it's a real head or a real tail
-                headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
-                tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
-                outFlag = (byte) (headFlag | tailFlag);
                 if (curHead) {
                     if (hasNext && !nextHead) {
                         // compress this head to the forward tail
@@ -181,7 +194,7 @@
                 }
             }
 
-            // if we didn't send ourselves to some other node, remap ourselves
+            // if we didn't send ourselves to some other node, remap ourselves for the next round
             if (!willMerge) {
                 outFlag |= MessageFlag.FROM_SELF;
                 outputValue.set(outFlag, curNode);
@@ -195,7 +208,9 @@
      */
     private static class MergePathsH4Reducer extends MapReduceBase implements
             Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
+        private MultipleOutputs mos;
+        public static final String COMPLETE_OUTPUT = "complete";
+        
         private int KMER_SIZE;
         private MessageWritableNodeWithFlag inputValue;
         private MessageWritableNodeWithFlag outputValue;
@@ -209,13 +224,16 @@
         private byte outFlag;
 
         public void configure(JobConf conf) {
+            mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
+            inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
             prevNode = new NodeWritable(KMER_SIZE);
             nextNode = new NodeWritable(KMER_SIZE);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
@@ -223,10 +241,14 @@
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                // all single nodes must be remapped
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
-                    // FROM_SELF => remap self
-                    output.collect(key, inputValue);
+                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+                        // complete path (H & T meet in this node)
+                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+                    } else { 
+                        // FROM_SELF => no merging this round. remap self
+                        output.collect(key, inputValue);
+                    }
                 } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
                     // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton?  error here!
                     throw new IOException("Only one value recieved in merge, but it wasn't from self!");
@@ -246,10 +268,12 @@
                         sawPrevNode = true;
                     } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
                         nextNode.set(inputValue.getNode());
-                        sawNextNode = false;
-                    } else {
+                        sawNextNode = true;
+                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
                         curNode.set(inputValue.getNode());
                         sawCurNode = true;
+                    } else {
+                        throw new IOException("Unknown origin for merging node");
                     }
                     if (!values.hasNext()) {
                         break;
@@ -279,7 +303,8 @@
                 outputValue.set(outFlag, curNode);
                 if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
                     // True heads meeting tails => merge is complete for this node
-                    // TODO: send to the "complete" collector
+                    mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
+                    // TODO send update to this node's neighbors
                 } else {
                     output.collect(key, outputValue);
                 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index e7bcdf6..c8386ea 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -57,6 +57,7 @@
         private int inDegree;
         private int outDegree;
         private NodeWritable emptyNode;
+        private Iterator<PositionWritable> posIterator;
 
         public PathNodeInitialMapper() {
 
@@ -80,26 +81,51 @@
                 outputValue.set(MessageFlag.FROM_SELF, key);
                 output.collect(key.getNodeID(), outputValue);
                 reporter.incrCounter("genomix", "path_nodes", 1);
-            } else if (outDegree == 1) {
-                // Not a path myself, but my successor might be one. Map forward successor
+            } else if (inDegree == 0 && outDegree == 1) {
+                // start of a tip.  needs to merge & be marked as head
+                outputValue.set(MessageFlag.FROM_SELF, key);
+                output.collect(key.getNodeID(), outputValue);
+                reporter.incrCounter("genomix", "path_nodes", 1);
+
                 outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
-                if (key.getFFList().getCountOfPosition() > 0) {
-                    outputKey.set(key.getFFList().getPosition(0));
-                } else {
-                    outputKey.set(key.getFRList().getPosition(0));
-                }
-                output.collect(outputKey, outputValue);
-            } else if (inDegree == 1) {
-                // Not a path myself, but my predecessor might be one.
+                output.collect(key.getNodeID(), outputValue);
+            } else if (inDegree == 1 && outDegree == 0) {
+                // end of a tip.  needs to merge & be marked as tail
+                outputValue.set(MessageFlag.FROM_SELF, key);
+                output.collect(key.getNodeID(), outputValue);
+                reporter.incrCounter("genomix", "path_nodes", 1);
+
                 outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
-                if (key.getRRList().getCountOfPosition() > 0) {
-                    outputKey.set(key.getRRList().getPosition(0));
-                } else {
-                    outputKey.set(key.getRFList().getPosition(0));
-                }
-                output.collect(outputKey, outputValue);
+                output.collect(key.getNodeID(), outputValue);
             } else {
-                // TODO: all other nodes will not participate-- should they be collected in a "complete" output?
+                if (outDegree > 0) {
+                    // Not a path myself, but my successor might be one. Map forward successor to find heads
+                    outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
+                    posIterator = key.getFFList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                    posIterator = key.getFRList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                }
+                if (inDegree > 0) {
+                    // Not a path myself, but my predecessor might be one. map predecessor to find tails 
+                    outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
+                    posIterator = key.getRRList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                    posIterator = key.getRFList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                }
             }
         }
     }
@@ -128,7 +154,7 @@
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
                     // FROM_SELF => need to keep this PATH node
                     output.collect(key, inputValue);
                 }
@@ -138,15 +164,14 @@
                 flag = MessageFlag.EMPTY_MESSAGE;
                 while (true) { // process values; break when no more
                     count++;
-                    if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+                    if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
                         // SELF -> keep this node
+                        flag |= MessageFlag.FROM_SELF;
                         nodeToKeep.set(inputValue.getNode());
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) == MessageFlag.FROM_SUCCESSOR) {
+                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
                         flag |= MessageFlag.IS_TAIL;
-                        reporter.incrCounter("genomix", "path_nodes_tails", 1);
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+                    } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
                         flag |= MessageFlag.IS_HEAD;
-                        reporter.incrCounter("genomix", "path_nodes_heads", 1);
                     }
                     if (!values.hasNext()) {
                         break;
@@ -158,11 +183,21 @@
                     throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
                             + String.valueOf(count));
                 }
-                if ((flag & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
-                    // only map simple path nodes
+                if ((flag & MessageFlag.FROM_SELF) > 0) {
+                    // only keep simple path nodes
                     outputValue.set(flag, nodeToKeep);
                     output.collect(key, outputValue);
+                    
                     reporter.incrCounter("genomix", "path_nodes", 1);
+                    if ((flag & MessageFlag.IS_HEAD) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_heads", 1);
+                    }
+                    if ((flag & MessageFlag.IS_TAIL) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_tails", 1);
+                    }
+                } else {
+                    // this is a non-path node.
+                    // TODO: keep this node in a "completed" reducer
                 }
             }
         }