mergepaths uses multipleoutputs properly (prep for update code)
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index b240833..926afe1 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Random;
@@ -21,10 +22,15 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
@@ -213,8 +219,12 @@
     private static class MergePathsH4Reducer extends MapReduceBase implements
             Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
         private MultipleOutputs mos;
-        public static final String COMPLETE_OUTPUT = "complete";
-        public static final String UPDATES_OUTPUT = "update";
+        private static final String TO_MERGE_OUTPUT = "toMerge";
+        private static final String COMPLETE_OUTPUT = "complete";
+        private static final String UPDATES_OUTPUT = "update";
+        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
+        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
+        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> updatesCollector;
         
         private int KMER_SIZE;
         private MessageWritableNodeWithFlag inputValue;
@@ -243,16 +253,19 @@
         public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
                 throws IOException {
+        	toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+        	completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+        	updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
                 if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
                     if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
                         // complete path (H & T meet in this node)
-                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+                        completeCollector.collect(key, inputValue);
                     } else { 
                         // FROM_SELF => no merging this round. remap self
-                        output.collect(key, inputValue);
+                        toMergeCollector.collect(key, inputValue);
                     }
                 } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
                     // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton?  error here!
@@ -308,9 +321,9 @@
                 outputValue.set(outFlag, curNode);
                 if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
                     // True heads meeting tails => merge is complete for this node
-                    mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
+                    completeCollector.collect(key, outputValue);
                 } else {
-                    output.collect(key, outputValue);
+                    toMergeCollector.collect(key, outputValue);
                 }
             }
         }
@@ -319,16 +332,17 @@
     /*
      * Run one iteration of the mergePaths algorithm
      */
-    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput, JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(MergePathsH4.class);
         conf.setJobName("MergePathsH4 " + inputPath);
 
-        FileInputFormat.addInputPath(conf, new Path(inputPath));
-        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+        FileInputFormat.addInputPaths(conf, inputPath);
+        Path outputPath = new Path(inputPath + ".h4merge.tmp");
+        FileOutputFormat.setOutputPath(conf, outputPath);
 
         conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        conf.setOutputFormat(NullOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
         conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
@@ -337,10 +351,24 @@
 
         conf.setMapperClass(MergePathsH4Mapper.class);
         conf.setReducerClass(MergePathsH4Reducer.class);
-
-        FileSystem.get(conf).delete(new Path(outputPath), true);
-
-        return JobClient.runJob(conf);
+        
+        MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        		PositionWritable.class, MessageWritableNodeWithFlag.class);
+        MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        		PositionWritable.class, MessageWritableNodeWithFlag.class);
+        MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        		PositionWritable.class, MessageWritableNodeWithFlag.class);
+        
+        FileSystem dfs = FileSystem.get(conf); 
+        dfs.delete(outputPath, true); // clean output dir
+        RunningJob job = JobClient.runJob(conf);
+        
+        // move the tmp outputs to the arg-spec'ed dirs
+        dfs.rename(new Path(outputPath + File.pathSeparator +  MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(toMergeOutput));
+        dfs.rename(new Path(outputPath + File.pathSeparator +  MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(completeOutput));
+        dfs.rename(new Path(outputPath + File.pathSeparator +  MergePathsH4Reducer.UPDATES_OUTPUT), new Path(updatesOutput));
+        
+        return job;
     }
 
     @Override
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 497e926..2974d4c 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.genomix.hadoop.pmcommon;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -33,8 +34,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -48,6 +49,9 @@
  */
 @SuppressWarnings("deprecation")
 public class PathNodeInitial extends Configured implements Tool {
+	
+	public static final String COMPLETE_OUTPUT = "complete";
+    public static final String TO_MERGE_OUTPUT = "toMerge";
 
     public static class PathNodeInitialMapper extends MapReduceBase implements
             Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
@@ -71,6 +75,7 @@
         public void map(NodeWritable key, NullWritable value,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
                 throws IOException {
+        	
             inDegree = key.inDegree();
             outDegree = key.outDegree();
             if (inDegree == 1 && outDegree == 1) {
@@ -132,46 +137,48 @@
 
     public static class PathNodeInitialReducer extends MapReduceBase implements
             Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-        private MultipleOutputs mos;
-        private static final String COMPLETE_OUTPUT = "complete";
-        private int KMER_SIZE;
+    	protected MultipleOutputs mos;
+        protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
+        protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
+        protected int KMER_SIZE;
+        
         private MessageWritableNodeWithFlag inputValue;
         private MessageWritableNodeWithFlag outputValue;
         private NodeWritable nodeToKeep;
         private int count;
         private byte flag;
-        private boolean isComplete;
 
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            nodeToKeep = new NodeWritable(KMER_SIZE);
+            nodeToKeep = new NodeWritable(KMER_SIZE);            
         }
 
-        @SuppressWarnings("unchecked")
-        @Override
+        @SuppressWarnings("unchecked")         
+		@Override
         public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
                 throws IOException {
-
+        	completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+        	toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+        	
             inputValue.set(values.next());
             if (!values.hasNext()) {
                 if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
                     if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
                         // non-path node.  Store in "complete" output
-                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+                    	completeCollector.collect(key, inputValue);
                     } else {
                         // FROM_SELF => need to keep this PATH node
-                        output.collect(key, inputValue);
+                    	toMergeCollector.collect(key, inputValue);
                     }
                 }
             } else {
                 // multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node 
                 count = 0;
                 flag = MessageFlag.EMPTY_MESSAGE;
-                isComplete = false;
                 while (true) { // process values; break when no more
                     count++;
                     if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
@@ -179,7 +186,7 @@
                         flag |= MessageFlag.FROM_SELF;
                         nodeToKeep.set(inputValue.getNode());
                         if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
-                            isComplete = true;
+                            flag |= MessageFlag.IS_COMPLETE;
                         }
                     } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
                         flag |= MessageFlag.IS_TAIL;
@@ -199,11 +206,11 @@
                 if ((flag & MessageFlag.FROM_SELF) > 0) {
                     if ((flag & MessageFlag.IS_COMPLETE) > 0) {
                         // non-path node.  Store in "complete" output
-                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+                    	completeCollector.collect(key, inputValue);
                     } else {
                         // only keep simple path nodes
                         outputValue.set(flag, nodeToKeep);
-                        output.collect(key, outputValue);
+                        toMergeCollector.collect(key, outputValue);
 
                         reporter.incrCounter("genomix", "path_nodes", 1);
                         if ((flag & MessageFlag.IS_HEAD) > 0) {
@@ -218,21 +225,26 @@
                 }
             }
         }
+        
+        public void close() throws IOException {
+            mos.close();
+        }
     }
 
     /*
      * Mark the head, tail, and simple path nodes in one map-reduce job.
      */
-    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(PathNodeInitial.class);
         conf.setJobName("PathNodeInitial " + inputPath);
 
-        FileInputFormat.addInputPath(conf, new Path(inputPath));
-        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+        FileInputFormat.addInputPaths(conf, inputPath);
+        Path outputPath = new Path(inputPath + ".initialMerge.tmp");
+        FileOutputFormat.setOutputPath(conf, outputPath);
 
         conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        conf.setOutputFormat(NullOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
         conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
@@ -241,16 +253,27 @@
 
         conf.setMapperClass(PathNodeInitialMapper.class);
         conf.setReducerClass(PathNodeInitialReducer.class);
+        
+        MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        		PositionWritable.class, MessageWritableNodeWithFlag.class);
+        MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        		PositionWritable.class, MessageWritableNodeWithFlag.class);
 
-        FileSystem.get(conf).delete(new Path(outputPath), true);
-
-        return JobClient.runJob(conf);
+        FileSystem dfs = FileSystem.get(conf); 
+        dfs.delete(outputPath, true); // clean output dir
+        RunningJob job = JobClient.runJob(conf);
+        
+        // move the tmp outputs to the arg-spec'ed dirs
+        dfs.rename(new Path(outputPath + File.pathSeparator +  TO_MERGE_OUTPUT), new Path(toMergeOutput));
+        dfs.rename(new Path(outputPath + File.pathSeparator +  COMPLETE_OUTPUT), new Path(completeOutput));
+        
+        return job;
     }
 
     @Override
-    public int run(String[] arg0) throws Exception {
-        // TODO Auto-generated method stub
-        return 0;
+    public int run(String[] args) throws Exception {
+    	int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+        return res;
     }
 
     public static void main(String[] args) throws Exception {