Merge branch 'genomix/fullstack_genomix' into wbiesing/genomix/fullstack_genomix
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index ad8ead6..a7ca739 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -23,7 +23,7 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeFlag;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
@@ -44,14 +44,14 @@
      * Heads send themselves to their successors, and all others map themselves.
      */
     private static class MergePathsH3Mapper extends MapReduceBase implements
-            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private static long randSeed;
         private Random randGenerator;
         private float probBeingRandomHead;
 
         private int KMER_SIZE;
         private PositionWritable outputKey;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private byte headFlag;
         private byte outFlag;
@@ -64,7 +64,7 @@
             finalMerge = conf.getBoolean("finalMerge", false);
 
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputKey = new PositionWritable();
             curNode = new NodeWritable(KMER_SIZE);
         }
@@ -76,8 +76,8 @@
         }
 
         @Override
-        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
             curNode = value.getNode();
             // Map all path vertices; Heads and pseudoheads are sent to their successors
@@ -103,7 +103,7 @@
                 output.collect(outputKey, outputValue);
             } else {
                 // tail nodes map themselves
-                outFlag |= MergeMessageFlag.FROM_SELF;
+                outFlag |= MergeMessageFlag.MSG_SELF;
                 outputValue.set(outFlag, curNode);
                 output.collect(key, outputValue);
             }
@@ -114,11 +114,11 @@
      * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes 
      */
     private static class MergePathsH3Reducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable headNode;
         private NodeWritable tailNode;
         private int count;
@@ -126,20 +126,20 @@
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             headNode = new NodeWritable(KMER_SIZE);
             tailNode = new NodeWritable(KMER_SIZE);
         }
 
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
 
             inputValue = values.next();
             if (!values.hasNext()) {
                 // all single nodes must be remapped
-                if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) == MergeMessageFlag.FROM_SELF) {
+                if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) == MergeMessageFlag.MSG_SELF) {
                     // FROM_SELF => remap self
                     output.collect(key, inputValue);
                 } else {
@@ -202,9 +202,9 @@
         conf.setOutputFormat(SequenceFileOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
         conf.setMapperClass(MergePathsH3Mapper.class);
         conf.setReducerClass(MergePathsH3Reducer.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 891be154..7e6df5e 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -29,11 +29,11 @@
 import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
 import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
 
 @SuppressWarnings("deprecation")
 public class MergePathsH4 extends Configured implements Tool {
@@ -43,14 +43,14 @@
      * Heads send themselves to their successors, and all others map themselves.
      */
     public static class MergePathsH4Mapper extends MapReduceBase implements
-            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private static long randSeed;
         private Random randGenerator;
         private float probBeingRandomHead;
 
         private int KMER_SIZE;
         private PositionWritable outputKey;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private PositionWritable curID;
         private PositionWritable nextID;
@@ -72,7 +72,7 @@
             probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
 
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputKey = new PositionWritable();
             curNode = new NodeWritable(KMER_SIZE);
             curID = new PositionWritable();
@@ -121,12 +121,12 @@
         }
 
         @Override
-        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
             // Node may be marked as head b/c it's a real head or a real tail
-            headFlag = (byte) (MergeMessageFlag.IS_HEAD & value.getFlag());
-            tailFlag = (byte) (MergeMessageFlag.IS_TAIL & value.getFlag());
+            headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+            tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
             outFlag = (byte) (headFlag | tailFlag);
             
             // only PATH vertices are present. Find the ID's for my neighbors
@@ -140,14 +140,11 @@
             hasPrev = setPrevInfo(curNode) && headFlag == 0;
             willMerge = false;
             
-            reporter.setStatus("CHECK ME OUT");
-            System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
-
             // TODO: need to update edges in neighboring nodes
             
-            if ((outFlag & MergeMessageFlag.IS_HEAD) > 0 && (outFlag & MergeMessageFlag.IS_TAIL) > 0) {
+            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
                 // true HEAD met true TAIL. this path is complete
-                outFlag |= MergeMessageFlag.FROM_SELF;
+                outFlag |= MessageFlag.MSG_SELF;
                 outputValue.set(outFlag, curNode);
                 output.collect(curID, outputValue);
                 return;
@@ -156,13 +153,13 @@
                 if (curHead) {
                     if (hasNext && !nextHead) {
                         // compress this head to the forward tail
-                        outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
+                        outFlag |= MessageFlag.FROM_PREDECESSOR;
                         outputValue.set(outFlag, curNode);
                         output.collect(nextID, outputValue);
                         willMerge = true;
                     } else if (hasPrev && !prevHead) {
                         // compress this head to the reverse tail
-                        outFlag |= MergeMessageFlag.FROM_SUCCESSOR;
+                        outFlag |= MessageFlag.FROM_SUCCESSOR;
                         outputValue.set(outFlag, curNode);
                         output.collect(prevID, outputValue);
                         willMerge = true;
@@ -173,7 +170,7 @@
                         if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
-                            outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
+                            outFlag |= MessageFlag.FROM_PREDECESSOR;
                             outputValue.set(outFlag, curNode);
                             output.collect(nextID, outputValue);
                             willMerge = true;
@@ -182,7 +179,7 @@
                         // no previous node
                         if (!nextHead && curID.compareTo(nextID) < 0) {
                             // merge towards tail in forward dir
-                            outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
+                            outFlag |= MessageFlag.FROM_PREDECESSOR;
                             outputValue.set(outFlag, curNode);
                             output.collect(nextID, outputValue);
                             willMerge = true;
@@ -191,7 +188,7 @@
                         // no next node
                         if (!prevHead && curID.compareTo(prevID) < 0) {
                             // merge towards tail in reverse dir
-                            outFlag |= MergeMessageFlag.FROM_SUCCESSOR;
+                            outFlag |= MessageFlag.FROM_SUCCESSOR;
                             outputValue.set(outFlag, curNode);
                             output.collect(prevID, outputValue);
                             willMerge = true;
@@ -202,7 +199,7 @@
 
             // if we didn't send ourselves to some other node, remap ourselves for the next round
             if (!willMerge) {
-                outFlag |= MergeMessageFlag.FROM_SELF;
+                outFlag |= MessageFlag.MSG_SELF;
                 outputValue.set(outFlag, curNode);
                 output.collect(curID, outputValue);
             }
@@ -217,18 +214,18 @@
      * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes 
      */
     private static class MergePathsH4Reducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private MultipleOutputs mos;
         private static final String TO_MERGE_OUTPUT = "toMerge";
         private static final String COMPLETE_OUTPUT = "complete";
         private static final String UPDATES_OUTPUT = "update";
-        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
-        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
-        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> updatesCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
         
         private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private NodeWritable prevNode;
         private NodeWritable nextNode;
@@ -241,8 +238,8 @@
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            inputValue = new NodeWithFlagWritable(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
             prevNode = new NodeWritable(KMER_SIZE);
             nextNode = new NodeWritable(KMER_SIZE);
@@ -250,8 +247,8 @@
 
         @SuppressWarnings("unchecked")
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
         	toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
         	completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
@@ -259,35 +256,35 @@
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
-                    if ((inputValue.getFlag() & MergeMessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MergeMessageFlag.IS_TAIL) > 0) {
+                if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
+                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
                         // complete path (H & T meet in this node)
                         completeCollector.collect(key, inputValue);
                     } else { 
                         // FROM_SELF => no merging this round. remap self
                         toMergeCollector.collect(key, inputValue);
                     }
-                } else if ((inputValue.getFlag() & (MergeMessageFlag.FROM_PREDECESSOR | MergeMessageFlag.FROM_SUCCESSOR)) > 0) {
+                } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
                     // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton?  error here!
                     throw new IOException("Only one value recieved in merge, but it wasn't from self!");
                 }
             } else {
                 // multiple inputs => a merge will take place. Aggregate all, then collect the merged path
                 count = 0;
-                outFlag = MergeMessageFlag.EMPTY_MESSAGE;
+                outFlag = MessageFlag.EMPTY_MESSAGE;
                 sawCurNode = false;
                 sawPrevNode = false;
                 sawNextNode = false;
                 while (true) { // process values; break when no more
                     count++;
-                    outFlag |= (inputValue.getFlag() & (MergeMessageFlag.IS_HEAD | MergeMessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
-                    if ((inputValue.getFlag() & MergeMessageFlag.FROM_PREDECESSOR) > 0) {
+                    outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
+                    if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
                         prevNode.set(inputValue.getNode());
                         sawPrevNode = true;
-                    } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_SUCCESSOR) > 0) {
+                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
                         nextNode.set(inputValue.getNode());
                         sawNextNode = true;
-                    } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
+                    } else if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
                         curNode.set(inputValue.getNode());
                         sawCurNode = true;
                     } else {
@@ -319,7 +316,7 @@
                 }
                 
                 outputValue.set(outFlag, curNode);
-                if ((outFlag & MergeMessageFlag.IS_HEAD) > 0 && (outFlag & MergeMessageFlag.IS_TAIL) > 0) {
+                if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
                     // True heads meeting tails => merge is complete for this node
                     completeCollector.collect(key, outputValue);
                 } else {
@@ -349,19 +346,19 @@
         conf.setOutputFormat(NullOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
         conf.setMapperClass(MergePathsH4Mapper.class);
         conf.setReducerClass(MergePathsH4Reducer.class);
         
         MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, MessageWritableNodeWithFlag.class);
+        		PositionWritable.class, NodeWithFlagWritable.class);
         MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, MessageWritableNodeWithFlag.class);
+        		PositionWritable.class, NodeWithFlagWritable.class);
         MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, MessageWritableNodeWithFlag.class);
+        		PositionWritable.class, NodeWithFlagWritable.class);
         
         FileSystem dfs = FileSystem.get(conf); 
         // clean output dirs
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
index 5d4b886..4009fc6 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -26,7 +26,7 @@
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
@@ -37,22 +37,22 @@
      * Mapper class: removes any tips by not mapping them at all
      */
     private static class RemoveTipsMapper extends MapReduceBase implements
-            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private int KMER_SIZE;
         private int removeTipsMinLength;
 
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
 
         public void configure(JobConf conf) {
             removeTipsMinLength = conf.getInt("removeTipsMinLength", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
         }
 
         @Override
-        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
             curNode.set(value.getNode());
             if ((curNode.inDegree() == 0 || curNode.outDegree() == 0)
@@ -60,7 +60,7 @@
                 // kill this node by NOT mapping it.  Update my neighbors with a suicide note
                 //TODO: update neighbors by removing me from its list
             } else {
-                outputValue.set(MergeMessageFlag.FROM_SELF, curNode);
+                outputValue.set(MergeMessageFlag.MSG_SELF, curNode);
                 output.collect(key, value);
             }
         }
@@ -70,11 +70,11 @@
      * Reducer class: keeps mapped nodes 
      */
     private static class MergePathsH4Reducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private NodeWritable prevNode;
         private NodeWritable nextNode;
@@ -86,20 +86,20 @@
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
             prevNode = new NodeWritable(KMER_SIZE);
             nextNode = new NodeWritable(KMER_SIZE);
         }
 
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
+                if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) > 0) {
                     // FROM_SELF => keep self
                     output.collect(key, inputValue);
                 } else {
@@ -126,9 +126,9 @@
         conf.setOutputFormat(SequenceFileOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
         conf.setMapperClass(MergePathsH4Mapper.class);
         conf.setReducerClass(MergePathsH4Reducer.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
deleted file mode 100644
index f05797e..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package edu.uci.ics.genomix.hadoop.pmcommon;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-/*
- * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
- * This class is used as the value in several MapReduce algorithms.
- */
-public class MessageWritableNodeWithFlag extends BinaryComparable implements WritableComparable<BinaryComparable> {
-    private byte flag;
-    private NodeWritable node;
-
-    public MessageWritableNodeWithFlag() {
-        this(0);
-    }
-
-    public MessageWritableNodeWithFlag(int k) {
-        this.flag = 0;
-        this.node = new NodeWritable(k);
-    }
-
-    public MessageWritableNodeWithFlag(byte flag, int kmerSize) {
-        this.flag = flag;
-        this.node = new NodeWritable(kmerSize);
-    }
-    
-    public MessageWritableNodeWithFlag(byte flag, NodeWritable node) {
-        this(node.getKmer().getKmerLength());
-        set(flag, node);
-    }
-
-    public void set(MessageWritableNodeWithFlag right) {
-        set(right.getFlag(), right.getNode());
-    }
-
-    public void set(byte flag, NodeWritable node) {
-        this.node.set(node);
-        this.flag = flag;
-    }
-
-    @Override
-    public void readFields(DataInput arg0) throws IOException {
-        node.readFields(arg0);
-        flag = arg0.readByte();
-    }
-
-    @Override
-    public void write(DataOutput arg0) throws IOException {
-        node.write(arg0);
-        arg0.writeByte(flag);
-    }
-
-    public NodeWritable getNode() {
-        if (node.getCount() != 0) {
-            return node;
-        }
-        return null;
-    }
-
-    public byte getFlag() {
-        return this.flag;
-    }
-
-    public String toString() {
-        return node.toString() + '\t' + String.valueOf(flag);
-    }
-
-    @Override
-    public byte[] getBytes() {
-        if (node.getCount() != 0) {
-            return node.getKmer().getBytes();
-        } else
-            return null;
-    }
-
-    @Override
-    public int getLength() {
-        return node.getCount();
-    }
-
-    @Override
-    public int hashCode() {
-//        return super.hashCode() + flag + node.hashCode();
-        return flag + node.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object rightObj) {
-        if (rightObj instanceof MessageWritableNodeWithFlag) {
-            MessageWritableNodeWithFlag rightMessage = (MessageWritableNodeWithFlag) rightObj;
-            return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
-        }
-        return false;
-    }
-}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
new file mode 100644
index 0000000..f929151
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -0,0 +1,186 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import javax.management.RuntimeErrorException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+/*
+ * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
+ * This class is used as the value in several MapReduce algorithms.
+ */
+public class NodeWithFlagWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
+    private byte flag;
+    private NodeWritable node;
+    
+    public static class MessageFlag {
+        public static final byte EMPTY_MESSAGE = 0;
+        // message types
+        public static final byte MSG_SELF = 0b01 << 0;
+        public static final byte MSG_UPDATE_MERGE = 0b10 << 0;
+        public static final byte MSG_UPDATE_EDGE = 0b11 << 0;
+        public static final byte MSG_MASK = 0b11 << 0; 
+        // merge/update directions
+        public static final byte DIR_FF = 0b00 << 2;
+        public static final byte DIR_FR = 0b01 << 2;
+        public static final byte DIR_RF = 0b10 << 2;
+        public static final byte DIR_RR = 0b11 << 2;
+        public static final byte DIR_MASK = 0b11 << 2;
+        // additional info
+        public static final byte IS_HEAD = 0b1 << 4;
+        public static final byte IS_TAIL = 0b1 << 5;
+        // extra bit used differently in each operation
+        public static final byte EXTRA_FLAG = 1 << 6;
+    }
+    
+    /*
+     * Process any changes to @node contained in @updateMsg.  This includes merges and edge updates
+     */
+    public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
+        byte updateFlag = updateMsg.getFlag();
+        NodeWritable updateNode = updateMsg.getNode();
+        if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
+            // this message wants to update the edges of node.
+            // remove position and merge its position lists with node
+            if (!updateNode.equals(NodeWritable.EMPTY_NODE)) {
+                // need to remove updateNode from the specified PositionList
+                switch(updateFlag & MessageFlag.DIR_MASK) {
+                    case MessageFlag.DIR_FF:
+                        node.getFFList().remove(updateNode.getNodeID());
+                        break;
+                    case MessageFlag.DIR_FR:
+                        node.getFRList().remove(updateNode.getNodeID());
+                        break;
+                    case MessageFlag.DIR_RF:
+                        node.getRFList().remove(updateNode.getNodeID());
+                        break;
+                    case MessageFlag.DIR_RR:
+                        node.getRRList().remove(updateNode.getNodeID());
+                        break;
+                    default:
+                        throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+                }
+            }
+            // now merge positionlists from update and node
+            node.getFFList().appendList(updateNode.getFFList());
+            node.getFRList().appendList(updateNode.getFRList());
+            node.getRFList().appendList(updateNode.getRFList());
+            node.getRRList().appendList(updateNode.getRRList());
+        } else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
+            // this message wants to merge node with updateNode.
+            // the direction flag indicates how the merge should take place.
+            switch(updateFlag & MessageFlag.DIR_MASK) {
+                case MessageFlag.DIR_FF:
+                    node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+                    node.getFFList().set(updateNode.getFFList());
+                    break;
+                case MessageFlag.DIR_FR:
+                    // FIXME not sure if this should be reverse-complement or just reverse...
+                    node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+                    node.getFRList().set(updateNode.getFRList());
+                    break;
+                case MessageFlag.DIR_RF:
+                    
+                    break;
+                case MessageFlag.DIR_RR:
+                    node.getKmer().mergeWithRRKmer(kmerSize, updateNode.getKmer());
+                    node.getRRList().set(updateNode.getRRList());
+                    break;
+                default:
+                    throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+            }
+        }
+    }
+
+    public NodeWithFlagWritable() {
+        this(0);
+    }
+
+    public NodeWithFlagWritable(int k) {
+        this.flag = 0;
+        this.node = new NodeWritable(k);
+    }
+
+    public NodeWithFlagWritable(byte flag, int kmerSize) {
+        this.flag = flag;
+        this.node = new NodeWritable(kmerSize);
+    }
+    
+    public NodeWithFlagWritable(byte flag, NodeWritable node) {
+        this(node.getKmer().getKmerLength());
+        set(flag, node);
+    }
+
+    public void set(NodeWithFlagWritable right) {
+        set(right.getFlag(), right.getNode());
+    }
+
+    public void set(byte flag, NodeWritable node) {
+        this.node.set(node);
+        this.flag = flag;
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+        node.readFields(arg0);
+        flag = arg0.readByte();
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+        node.write(arg0);
+        arg0.writeByte(flag);
+    }
+
+    public NodeWritable getNode() {
+        if (node.getCount() != 0) {
+            return node;
+        }
+        return null;
+    }
+
+    public byte getFlag() {
+        return this.flag;
+    }
+
+    public String toString() {
+        return node.toString() + '\t' + String.valueOf(flag);
+    }
+
+    @Override
+    public byte[] getBytes() {
+        if (node.getCount() != 0) {
+            return node.getKmer().getBytes();
+        } else
+            return null;
+    }
+
+    @Override
+    public int getLength() {
+        return node.getCount();
+    }
+
+    @Override
+    public int hashCode() {
+//        return super.hashCode() + flag + node.hashCode();
+        return flag + node.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object rightObj) {
+        if (rightObj instanceof NodeWithFlagWritable) {
+            NodeWithFlagWritable rightMessage = (NodeWithFlagWritable) rightObj;
+            return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 27058ed..0523812 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
@@ -58,17 +59,10 @@
     public static final String TO_MERGE_OUTPUT = "toMerge";
     public static final String TO_UPDATE_OUTPUT = "toUpdate";
 
-    public static class PathNodeFlag {
-        public static final byte EMPTY_MESSAGE = 0;
-        public static final byte FROM_SELF = 1 << 0;
-        public static final byte IS_HEAD = 1 << 1;
-        public static final byte IS_TAIL = 1 << 2;
-        public static final byte IS_COMPLETE = 1 << 3;
-        public static final byte NEAR_PATH = 1 << 4;
-    }
+    private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
 
-    private static void sendOutputToNextNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
-            OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+    private static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+            OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
         Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
         while (posIterator.hasNext()) {
             collector.collect(posIterator.next(), outputValue);
@@ -79,8 +73,8 @@
         }
     }
 
-    private static void sendOutputToPreviousNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
-            OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+    private static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+            OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
         Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
         while (posIterator.hasNext()) {
             collector.collect(posIterator.next(), outputValue);
@@ -92,18 +86,18 @@
     }
 
     public static class PathNodeInitialMapper extends MapReduceBase implements
-            Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
+            Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
         private PositionWritable outputKey;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
         private int inDegree;
         private int outDegree;
         private boolean pathNode;
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputKey = new PositionWritable();
         }
 
@@ -115,8 +109,7 @@
          */
         @Override
         public void map(NodeWritable key, NullWritable value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
-                throws IOException {
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
             inDegree = key.inDegree();
             outDegree = key.outDegree();
             if (inDegree == 1 && outDegree == 1) {
@@ -124,38 +117,38 @@
             } else if (inDegree == 0 && outDegree == 1) {
                 pathNode = true;
                 // start of a tip.  needs to merge & be marked as head
-                outputValue.set(PathNodeFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+                outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
                 output.collect(key.getNodeID(), outputValue);
             } else if (inDegree == 1 && outDegree == 0) {
                 pathNode = true;
                 // end of a tip.  needs to merge & be marked as tail
-                outputValue.set(PathNodeFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+                outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
                 output.collect(key.getNodeID(), outputValue);
             } else {
                 pathNode = false;
                 if (outDegree > 0) {
                     // Not a path myself, but my successor might be one. Map forward successor to find heads
-                    outputValue.set(PathNodeFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+                    outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
                     sendOutputToNextNeighbors(key, outputValue, output);
                 }
                 if (inDegree > 0) {
                     // Not a path myself, but my predecessor might be one. map predecessor to find tails 
-                    outputValue.set(PathNodeFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+                    outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
                     sendOutputToPreviousNeighbors(key, outputValue, output);
                 }
-                // this non-path node won't participate in the merge
-                outputValue.set((byte) (PathNodeFlag.FROM_SELF | PathNodeFlag.IS_COMPLETE), key);
+                // this non-path node won't participate in the merge. Mark as "complete" (H + T)
+                outputValue.set((byte) (MessageFlag.MSG_SELF | MessageFlag.IS_HEAD | MessageFlag.IS_TAIL), key);
                 output.collect(key.getNodeID(), outputValue);
             }
 
             if (pathNode) {
                 // simple path nodes map themselves
-                outputValue.set(PathNodeFlag.FROM_SELF, key);
+                outputValue.set(MessageFlag.MSG_SELF, key);
                 output.collect(key.getNodeID(), outputValue);
                 reporter.incrCounter("genomix", "path_nodes", 1);
 
                 // also mark neighbors of paths (they are candidates for updates)
-                outputValue.set(PathNodeFlag.NEAR_PATH, NodeWritable.EMPTY_NODE);
+                outputValue.set(NEAR_PATH, NodeWritable.EMPTY_NODE);
                 sendOutputToNextNeighbors(key, outputValue, output);
                 sendOutputToPreviousNeighbors(key, outputValue, output);
             }
@@ -163,24 +156,24 @@
     }
 
     public static class PathNodeInitialReducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private MultipleOutputs mos;
-        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
-        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
-        private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toUpdateCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
         private int KMER_SIZE;
 
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable nodeToKeep;
         private byte outputFlag;
         private byte inputFlag;
+        private boolean sawSelf;
 
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            inputValue = new NodeWithFlagWritable(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             nodeToKeep = new NodeWritable(KMER_SIZE);
         }
 
@@ -195,54 +188,54 @@
          */
         @SuppressWarnings("unchecked")
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
                 throws IOException {
             completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
-            toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
             toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
 
-            outputFlag = PathNodeFlag.EMPTY_MESSAGE;
+            outputFlag = MessageFlag.EMPTY_MESSAGE;
+            sawSelf = false;
             while (values.hasNext()) {
                 inputValue.set(values.next());
                 inputFlag = inputValue.getFlag();
                 outputFlag |= inputFlag;
 
-                if ((inputFlag & PathNodeFlag.FROM_SELF) > 0) {
+                if ((inputFlag & MessageFlag.MSG_SELF) > 0) {
                     // SELF -> keep this node
+                    if (sawSelf) {
+                        throw new IOException("Already saw SELF node in PathNodeInitialReducer! previous self: "
+                                + nodeToKeep.toString() + ". current self: " + inputValue.getNode().toString());
+                    }
+                    sawSelf = true;
                     nodeToKeep.set(inputValue.getNode());
                 }
             }
 
-            if ((outputFlag & PathNodeFlag.FROM_SELF) > 0) {
-                if ((outputFlag & PathNodeFlag.IS_COMPLETE) > 0) {
-                    if ((outputFlag & PathNodeFlag.NEAR_PATH) > 0) {
-                        // non-path, but update candidate
-                        outputValue.set(PathNodeFlag.NEAR_PATH, nodeToKeep);
+            if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
+                if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
+                    // non-path or single path nodes
+                    if ((outputFlag & NEAR_PATH) > 0) {
+                        // non-path, but an update candidate
+                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
                         toUpdateCollector.collect(key, outputValue);
                     } else {
-                        // non-path node.  Store in "complete" output
-                        outputValue.set(PathNodeFlag.EMPTY_MESSAGE, nodeToKeep);
+                        // non-path or single-node path.  Store in "complete" output
+                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
                         completeCollector.collect(key, outputValue);
                     }
                 } else {
-                    if ((outputFlag & PathNodeFlag.IS_HEAD) > 0 && (outputFlag & PathNodeFlag.IS_TAIL) > 0) {
-                        // path nodes marked as H & T are single-node paths (not mergeable, not updateable)
-                        outputValue.set(PathNodeFlag.EMPTY_MESSAGE, nodeToKeep);
-                        completeCollector.collect(key, outputValue);
-                    } else {
-                        // path nodes that are mergeable
-                        outputFlag &= (PathNodeFlag.IS_HEAD | PathNodeFlag.IS_TAIL); // clear flags except H/T 
-                        outputValue.set(outputFlag, nodeToKeep);
-                        toMergeCollector.collect(key, outputValue);
+                    // path nodes that are mergeable
+                    outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
+                    outputValue.set(outputFlag, nodeToKeep);
+                    toMergeCollector.collect(key, outputValue);
 
-                        reporter.incrCounter("genomix", "path_nodes", 1);
-                        if ((outputFlag & PathNodeFlag.IS_HEAD) > 0) {
-                            reporter.incrCounter("genomix", "path_nodes_heads", 1);
-                        }
-                        if ((outputFlag & PathNodeFlag.IS_TAIL) > 0) {
-                            reporter.incrCounter("genomix", "path_nodes_tails", 1);
-                        }
+                    reporter.incrCounter("genomix", "path_nodes", 1);
+                    if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_heads", 1);
+                    }
+                    if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_tails", 1);
                     }
                 }
             } else {
@@ -265,35 +258,34 @@
         conf.setJobName("PathNodeInitial " + inputPath);
 
         FileInputFormat.addInputPaths(conf, inputPath);
-        Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
+        //        Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
+        Path outputPath = new Path(toMergeOutput);
         FileOutputFormat.setOutputPath(conf, outputPath);
 
         conf.setInputFormat(SequenceFileInputFormat.class);
         conf.setOutputFormat(NullOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
         conf.setMapperClass(PathNodeInitialMapper.class);
         conf.setReducerClass(PathNodeInitialReducer.class);
 
-        MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, MessageWritableNodeWithFlag.class);
         MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, MessageWritableNodeWithFlag.class);
+                PositionWritable.class, NodeWithFlagWritable.class);
         MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, MessageWritableNodeWithFlag.class);
+                PositionWritable.class, NodeWithFlagWritable.class);
 
         FileSystem dfs = FileSystem.get(conf);
         dfs.delete(outputPath, true); // clean output dir
         RunningJob job = JobClient.runJob(conf);
 
         // move the tmp outputs to the arg-spec'ed dirs
-        dfs.rename(new Path(outputPath + File.separator + TO_MERGE_OUTPUT), new Path(toMergeOutput));
         dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
         dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+        //        dfs.rename(outputPath, new Path(toMergeOutput));
 
         return job;
     }
@@ -305,7 +297,7 @@
     }
 
     public static void main(String[] args) throws Exception {
-        int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+        int res = new PathNodeInitial().run(args);
         System.exit(res);
     }
 }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
index 8f0be6c..f24fdc1 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
@@ -14,7 +14,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
@@ -41,9 +41,9 @@
     public void testNoNeighbors() throws IOException {
         NodeWritable noNeighborNode = new NodeWritable(posn1, new PositionListWritable(), new PositionListWritable(),
                 new PositionListWritable(), new PositionListWritable(), kmer);
-        MessageWritableNodeWithFlag output = new MessageWritableNodeWithFlag((byte) (MergeMessageFlag.FROM_SELF | MergeMessageFlag.IS_COMPLETE), noNeighborNode);
+        NodeWithFlagWritable output = new NodeWithFlagWritable((byte) (MergeMessageFlag.MSG_SELF | MergeMessageFlag.IS_COMPLETE), noNeighborNode);
         // test mapper
-        new MapDriver<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag>()
+        new MapDriver<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable>()
                 .withMapper(new PathNodeInitial.PathNodeInitialMapper())
                 .withConfiguration(conf)
                 .withInput(noNeighborNode, NullWritable.get())
@@ -51,7 +51,7 @@
                 .runTest();
         // test reducer
 //        MultipleOutputs.addNamedOutput(conf, "complete", SequenceFileOutputFormat.class, PositionWritable.class, MessageWritableNodeWithFlag.class);
-        new ReduceDriver<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag>()
+        new ReduceDriver<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable>()
                 .withReducer(new PathNodeInitial.PathNodeInitialReducer())
                 .withConfiguration(conf)
                 .withInput(posn1, Arrays.asList(output))