PathNodeInitial uses 2 output bins instead of 3
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 3c46dc7..c1d8a71 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -34,8 +34,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -57,7 +57,6 @@
 
     public static final String COMPLETE_OUTPUT = "complete";
     public static final String TO_MERGE_OUTPUT = "toMerge";
-    public static final String TO_UPDATE_OUTPUT = "toUpdate";
 
     private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
 
@@ -89,7 +88,6 @@
             Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
-        private PositionWritable outputKey;
         private NodeWithFlagWritable outputValue;
         private int inDegree;
         private int outDegree;
@@ -98,7 +96,6 @@
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             outputValue = new NodeWithFlagWritable(KMER_SIZE);
-            outputKey = new PositionWritable();
         }
 
         /*
@@ -159,7 +156,6 @@
             Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private MultipleOutputs mos;
         private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
-        private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
         private int KMER_SIZE;
 
         private NodeWithFlagWritable inputValue;
@@ -192,7 +188,6 @@
                 OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
                 throws IOException {
             completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
-            toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
 
             outputFlag = MessageFlag.EMPTY_MESSAGE;
             sawSelf = false;
@@ -211,35 +206,34 @@
                     nodeToKeep.set(inputValue.getNode());
                 }
             }
+            if (!sawSelf) {
+                throw new IOException("Didn't see a self node in PathNodeInitial! flag: " + outputFlag);
+            }
 
-            if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
-                if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
-                    // non-path or single path nodes
-                    if ((outputFlag & NEAR_PATH) > 0) {
-                        // non-path, but an update candidate
-                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
-                        toUpdateCollector.collect(key, outputValue);
-                    } else {
-                        // non-path or single-node path.  Store in "complete" output
-                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
-                        completeCollector.collect(key, outputValue);
-                    }
-                } else {
-                    // path nodes that are mergeable
-                    outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
-                    outputValue.set(outputFlag, nodeToKeep);
+            if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
+                // non-path or single path nodes
+                if ((outputFlag & NEAR_PATH) > 0) {
+                    // non-path, but an update candidate
+                    outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
                     toMergeCollector.collect(key, outputValue);
-
-                    reporter.incrCounter("genomix", "path_nodes", 1);
-                    if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
-                        reporter.incrCounter("genomix", "path_nodes_heads", 1);
-                    }
-                    if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
-                        reporter.incrCounter("genomix", "path_nodes_tails", 1);
-                    }
+                } else {
+                    // non-path or single-node path.  Store in "complete" output
+                    outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+                    completeCollector.collect(key, outputValue);
                 }
             } else {
-                throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
+                // path nodes that are mergeable
+                outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
+                outputValue.set(outputFlag, nodeToKeep);
+                toMergeCollector.collect(key, outputValue);
+
+                reporter.incrCounter("genomix", "path_nodes", 1);
+                if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
+                    reporter.incrCounter("genomix", "path_nodes_heads", 1);
+                }
+                if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
+                    reporter.incrCounter("genomix", "path_nodes_tails", 1);
+                }
             }
         }
 
@@ -251,41 +245,30 @@
     /*
      * Mark the head, tail, and simple path nodes in one map-reduce job.
      */
-    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
-            JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(PathNodeInitial.class);
         conf.setJobName("PathNodeInitial " + inputPath);
-
-        FileInputFormat.addInputPaths(conf, inputPath);
-        //        Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
-        Path outputPath = new Path(toMergeOutput);
-        FileOutputFormat.setOutputPath(conf, outputPath);
-
-        conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setOutputFormat(NullOutputFormat.class);
-
         conf.setMapOutputKeyClass(PositionWritable.class);
         conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
         conf.setOutputValueClass(NodeWithFlagWritable.class);
-
-        conf.setMapperClass(PathNodeInitialMapper.class);
-        conf.setReducerClass(PathNodeInitialReducer.class);
-
+        
+        conf.setInputFormat(SequenceFileInputFormat.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        FileInputFormat.addInputPaths(conf, inputPath);
+        FileOutputFormat.setOutputPath(conf, new Path(toMergeOutput));
         MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
                 PositionWritable.class, NodeWithFlagWritable.class);
-        MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, NodeWithFlagWritable.class);
-
         FileSystem dfs = FileSystem.get(conf);
-        dfs.delete(outputPath, true); // clean output dir
+        dfs.delete(new Path(toMergeOutput), true); // clean output dir
+        
+        conf.setMapperClass(PathNodeInitialMapper.class);
+        conf.setReducerClass(PathNodeInitialReducer.class);
         RunningJob job = JobClient.runJob(conf);
 
         // move the tmp outputs to the arg-spec'ed dirs
-        dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
-        dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
-        //        dfs.rename(outputPath, new Path(toMergeOutput));
+        dfs.rename(new Path(toMergeOutput + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
 
         return job;
     }