PathNodeInitial identifies nodes needing updates
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 54abda3..8955aab 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -27,6 +27,7 @@
      * 
      */
     private static final long serialVersionUID = 1L;
+    public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
     private PositionWritable nodeID;
     private PositionListWritable forwardForwardList;
     private PositionListWritable forwardReverseList;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index 544bd37..ad8ead6 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.util.ToolRunner;
 
 import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeFlag;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
@@ -32,28 +33,10 @@
     /*
      * Flags used when sending messages
      */
-    public static class MergeMessageFlag {
-        public static final byte EMPTY_MESSAGE = 0;
-        public static final byte FROM_SELF = 1;
-        public static final byte FROM_SUCCESSOR = 1 << 1;
-        public static final byte FROM_PREDECESSOR = 1 << 2;
-        public static final byte IS_HEAD = 1 << 3;
-        public static final byte IS_TAIL = 1 << 4;
-        public static final byte IS_PSEUDOHEAD = 1 << 5;
-        public static final byte IS_COMPLETE = 1 << 6;
-
-        public static String getFlagAsString(byte code) {
-            // TODO: allow multiple flags to be set
-            switch (code) {
-                case EMPTY_MESSAGE:
-                    return "EMPTY_MESSAGE";
-                case FROM_SELF:
-                    return "FROM_SELF";
-                case FROM_SUCCESSOR:
-                    return "FROM_SUCCESSOR";
-            }
-            return "ERROR_BAD_MESSAGE";
-        }
+    public static class MergeMessageFlag extends PathNodeFlag {
+        public static final byte FROM_SUCCESSOR = 1 << 5;
+        public static final byte FROM_PREDECESSOR = 1 << 6;
+        public static final byte IS_PSEUDOHEAD = ((byte) 1 << 6); //TODO FIXME        
     }
 
     /*
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 68b727d..cb3466b 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -45,13 +45,52 @@
 
 /*
  * A map-reduce job to find all nodes that are part of a simple path and the mark the nodes that
- * form their heads and tails.
+ * form their heads and tails, also identifies parts of the graph that will participate in a path merge.
+ * 
+ * This MR job uses MultipleOutputs rather than remapping the entire graph each iteration:
+ *   1. simple path nodes (indegree = outdegree = 1) (TO_MERGE_OUTPUT collector)
+ *   2. non-path, "complete" nodes, which will not be affected by the path merging (COMPLETE_OUTPUT collector)
+ *   3. non-path, "possibly updated" nodes, whose edges need to be updated after the merge (TO_UPDATE_OUTPUT collector)
  */
 @SuppressWarnings("deprecation")
 public class PathNodeInitial extends Configured implements Tool {
-	
-	public static final String COMPLETE_OUTPUT = "complete";
+
+    public static final String COMPLETE_OUTPUT = "complete";
     public static final String TO_MERGE_OUTPUT = "toMerge";
+    public static final String TO_UPDATE_OUTPUT = "toUpdate";
+    
+    public static class PathNodeFlag {
+        public static final byte EMPTY_MESSAGE = 0;
+        public static final byte FROM_SELF = 1 << 0;
+        public static final byte IS_HEAD = 1 << 1;
+        public static final byte IS_TAIL = 1 << 2;
+        public static final byte IS_COMPLETE = 1 << 3;
+        public static final byte NEAR_PATH = 1 << 4;
+    }
+
+    private static void sendOutputToNextNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
+            OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+        Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+        posIterator = node.getFRList().iterator(); // FRList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+    }
+
+    private static void sendOutputToPreviousNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
+            OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+        Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+        posIterator = node.getRFList().iterator(); // RFList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+    }
 
     public static class PathNodeInitialMapper extends MapReduceBase implements
             Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
@@ -61,171 +100,151 @@
         private MessageWritableNodeWithFlag outputValue;
         private int inDegree;
         private int outDegree;
-        private NodeWritable emptyNode;
-        private Iterator<PositionWritable> posIterator;
+        private boolean pathNode;
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             outputKey = new PositionWritable();
-            emptyNode = new NodeWritable();
         }
 
+        /*
+         * Identify the heads and tails of simple path nodes and their neighbors
+         * 
+         * (non-Javadoc)
+         * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+         */
         @Override
         public void map(NodeWritable key, NullWritable value,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
                 throws IOException {
-        	
             inDegree = key.inDegree();
             outDegree = key.outDegree();
             if (inDegree == 1 && outDegree == 1) {
+                pathNode = true;
+            } else if (inDegree == 0 && outDegree == 1) {
+                pathNode = true;
+                // start of a tip.  needs to merge & be marked as head
+                outputValue.set(MergeMessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+                output.collect(key.getNodeID(), outputValue);
+            } else if (inDegree == 1 && outDegree == 0) {
+                pathNode = true;
+                // end of a tip.  needs to merge & be marked as tail
+                outputValue.set(MergeMessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+                output.collect(key.getNodeID(), outputValue);
+            } else {
+                pathNode = false;
+                if (outDegree > 0) {
+                    // Not a path myself, but my successor might be one. Map forward successor to find heads
+                    outputValue.set(MergeMessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+                    sendOutputToNextNeighbors(key, outputValue, output);
+                }
+                if (inDegree > 0) {
+                    // Not a path myself, but my predecessor might be one. map predecessor to find tails 
+                    outputValue.set(MergeMessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+                    sendOutputToPreviousNeighbors(key, outputValue, output);
+                }
+                // this non-path node won't participate in the merge
+                outputValue.set((byte) (MergeMessageFlag.FROM_SELF | MergeMessageFlag.IS_COMPLETE), key);
+                output.collect(key.getNodeID(), outputValue);
+            }
+
+            if (pathNode) {
                 // simple path nodes map themselves
                 outputValue.set(MergeMessageFlag.FROM_SELF, key);
                 output.collect(key.getNodeID(), outputValue);
                 reporter.incrCounter("genomix", "path_nodes", 1);
-            } else if (inDegree == 0 && outDegree == 1) {
-                // start of a tip.  needs to merge & be marked as head
-                outputValue.set(MergeMessageFlag.FROM_SELF, key);
-                output.collect(key.getNodeID(), outputValue);
-                reporter.incrCounter("genomix", "path_nodes", 1);
 
-                outputValue.set(MergeMessageFlag.FROM_PREDECESSOR, emptyNode);
-                output.collect(key.getNodeID(), outputValue);
-            } else if (inDegree == 1 && outDegree == 0) {
-                // end of a tip.  needs to merge & be marked as tail
-                outputValue.set(MergeMessageFlag.FROM_SELF, key);
-                output.collect(key.getNodeID(), outputValue);
-                reporter.incrCounter("genomix", "path_nodes", 1);
-
-                outputValue.set(MergeMessageFlag.FROM_SUCCESSOR, emptyNode);
-                output.collect(key.getNodeID(), outputValue);
-            } else {
-                if (outDegree > 0) {
-                    // Not a path myself, but my successor might be one. Map forward successor to find heads
-                    outputValue.set(MergeMessageFlag.FROM_PREDECESSOR, emptyNode);
-                    posIterator = key.getFFList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
-                    posIterator = key.getFRList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
-                }
-                if (inDegree > 0) {
-                    // Not a path myself, but my predecessor might be one. map predecessor to find tails 
-                    outputValue.set(MergeMessageFlag.FROM_SUCCESSOR, emptyNode);
-                    posIterator = key.getRRList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
-                    posIterator = key.getRFList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
-                }
-                // push this non-path node to the "complete" output
-                outputValue.set((byte) (MergeMessageFlag.FROM_SELF | MergeMessageFlag.IS_COMPLETE), key);
-                output.collect(key.getNodeID(), outputValue);
+                // also mark neighbors of paths (they are candidates for updates)
+                outputValue.set(MergeMessageFlag.NEAR_PATH, NodeWritable.EMPTY_NODE);
+                sendOutputToNextNeighbors(key, outputValue, output);
+                sendOutputToPreviousNeighbors(key, outputValue, output);
             }
         }
     }
 
     public static class PathNodeInitialReducer extends MapReduceBase implements
             Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-    	protected MultipleOutputs mos;
-        protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
-        protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
-        protected int KMER_SIZE;
-        
+        private MultipleOutputs mos;
+        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
+        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
+        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toUpdateCollector;
+        private int KMER_SIZE;
+
         private MessageWritableNodeWithFlag inputValue;
         private MessageWritableNodeWithFlag outputValue;
         private NodeWritable nodeToKeep;
-        private int count;
-        private byte flag;
+        private byte outputFlag;
+        private byte inputFlag;
 
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            nodeToKeep = new NodeWritable(KMER_SIZE);            
+            nodeToKeep = new NodeWritable(KMER_SIZE);
         }
 
-        @SuppressWarnings("unchecked")         
-		@Override
+        /*
+         * Segregate nodes into three bins:
+         *   1. mergeable nodes (marked as H/T)
+         *   2. non-mergeable nodes that are candidates for updates
+         *   3. non-mergeable nodes that are not path neighbors and won't be updated
+         * 
+         * (non-Javadoc)
+         * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+         */
+        @SuppressWarnings("unchecked")
+        @Override
         public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
                 throws IOException {
-        	completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
-        	toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
-        	
-            inputValue.set(values.next());
-            if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
-                    if ((inputValue.getFlag() & MergeMessageFlag.IS_COMPLETE) > 0) {
-                        // non-path node.  Store in "complete" output
-                    	completeCollector.collect(key, inputValue);
+            completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+            toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+            toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
+
+            outputFlag = MergeMessageFlag.EMPTY_MESSAGE;
+            while (values.hasNext()) {
+                inputValue.set(values.next());
+                inputFlag = inputValue.getFlag();
+                outputFlag |= inputFlag;
+
+                if ((inputFlag & MergeMessageFlag.FROM_SELF) > 0) {
+                    // SELF -> keep this node
+                    nodeToKeep.set(inputValue.getNode());
+                }
+            }
+
+            if ((outputFlag & MergeMessageFlag.FROM_SELF) > 0) {
+                if ((outputFlag & MergeMessageFlag.IS_COMPLETE) > 0) {
+                    if ((outputFlag & MergeMessageFlag.NEAR_PATH) > 0) {
+                        // non-path, but update candidate
+                        outputValue.set(MergeMessageFlag.NEAR_PATH, nodeToKeep);
+                        toUpdateCollector.collect(key, outputValue);
                     } else {
-                        // FROM_SELF => need to keep this PATH node
-                    	toMergeCollector.collect(key, inputValue);
+                        // non-path node.  Store in "complete" output
+                        outputValue.set(MergeMessageFlag.EMPTY_MESSAGE, nodeToKeep);
+                        completeCollector.collect(key, outputValue);
+                    }
+                } else {
+                    // path nodes are mergeable
+                    outputFlag &= (MergeMessageFlag.IS_HEAD | MergeMessageFlag.IS_TAIL); // clear flags besides H/T 
+                    outputValue.set(outputFlag, nodeToKeep);
+                    toMergeCollector.collect(key, outputValue);
+
+                    reporter.incrCounter("genomix", "path_nodes", 1);
+                    if ((outputFlag & MergeMessageFlag.IS_HEAD) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_heads", 1);
+                    }
+                    if ((outputFlag & MergeMessageFlag.IS_TAIL) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_tails", 1);
                     }
                 }
             } else {
-                // multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node 
-                count = 0;
-                flag = MergeMessageFlag.EMPTY_MESSAGE;
-                while (true) { // process values; break when no more
-                    count++;
-                    if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
-                        // SELF -> keep this node
-                        flag |= MergeMessageFlag.FROM_SELF;
-                        nodeToKeep.set(inputValue.getNode());
-                        if ((inputValue.getFlag() & MergeMessageFlag.IS_COMPLETE) > 0) {
-                            flag |= MergeMessageFlag.IS_COMPLETE;
-                        }
-                    } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_SUCCESSOR) > 0) {
-                        flag |= MergeMessageFlag.IS_TAIL;
-                    } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_PREDECESSOR) > 0) {
-                        flag |= MergeMessageFlag.IS_HEAD;
-                    }
-                    if (!values.hasNext()) {
-                        break;
-                    } else {
-                        inputValue.set(values.next());
-                    }
-                }
-                if (count < 2) {
-                    throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
-                            + String.valueOf(count));
-                }
-                if ((flag & MergeMessageFlag.FROM_SELF) > 0) {
-                    if ((flag & MergeMessageFlag.IS_COMPLETE) > 0) {
-                        // non-path node.  Store in "complete" output
-                    	completeCollector.collect(key, inputValue);
-                    } else {
-                        // only keep simple path nodes
-                        outputValue.set(flag, nodeToKeep);
-                        toMergeCollector.collect(key, outputValue);
-
-                        reporter.incrCounter("genomix", "path_nodes", 1);
-                        if ((flag & MergeMessageFlag.IS_HEAD) > 0) {
-                            reporter.incrCounter("genomix", "path_nodes_heads", 1);
-                        }
-                        if ((flag & MergeMessageFlag.IS_TAIL) > 0) {
-                            reporter.incrCounter("genomix", "path_nodes_tails", 1);
-                        }
-                    }
-                } else {
-                    throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + flag);
-                }
+                throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
             }
         }
-        
+
         public void close() throws IOException {
             mos.close();
         }
@@ -234,13 +253,14 @@
     /*
      * Mark the head, tail, and simple path nodes in one map-reduce job.
      */
-    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
+            JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(PathNodeInitial.class);
         conf.setJobName("PathNodeInitial " + inputPath);
 
         FileInputFormat.addInputPaths(conf, inputPath);
-        Path outputPath = new Path(inputPath.replaceAll("/$",  "") + ".initialMerge.tmp");
+        Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
         FileOutputFormat.setOutputPath(conf, outputPath);
 
         conf.setInputFormat(SequenceFileInputFormat.class);
@@ -253,26 +273,29 @@
 
         conf.setMapperClass(PathNodeInitialMapper.class);
         conf.setReducerClass(PathNodeInitialReducer.class);
-        
-        MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, MessageWritableNodeWithFlag.class);
-        MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, MessageWritableNodeWithFlag.class);
 
-        FileSystem dfs = FileSystem.get(conf); 
+        MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, MessageWritableNodeWithFlag.class);
+        MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, MessageWritableNodeWithFlag.class);
+        MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, MessageWritableNodeWithFlag.class);
+
+        FileSystem dfs = FileSystem.get(conf);
         dfs.delete(outputPath, true); // clean output dir
         RunningJob job = JobClient.runJob(conf);
-        
+
         // move the tmp outputs to the arg-spec'ed dirs
-        dfs.rename(new Path(outputPath + File.separator +  TO_MERGE_OUTPUT), new Path(toMergeOutput));
-        dfs.rename(new Path(outputPath + File.separator +  COMPLETE_OUTPUT), new Path(completeOutput));
-        
+        dfs.rename(new Path(outputPath + File.separator + TO_MERGE_OUTPUT), new Path(toMergeOutput));
+        dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
+        dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+
         return job;
     }
 
     @Override
     public int run(String[] args) throws Exception {
-    	int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+        int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
         return res;
     }