test cases
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 291839c..6870ff1 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -254,4 +254,8 @@
         return inDegree() == 1 && outDegree() == 1;
     }
 
+    public boolean isSimpleOrTerminalPath() {
+        return isPathNode() || (inDegree() == 0 && outDegree() == 1) || (inDegree() == 1 && outDegree() == 0);
+    }
+
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index ae921c4..3d93baa 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -37,18 +38,14 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
-import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
-import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
@@ -90,8 +87,8 @@
         private PositionWritable curID;
         private PositionWritable nextID;
         private PositionWritable prevID;
-        private boolean hasNext;
-        private boolean hasPrev;
+        private boolean mergeableNext;
+        private boolean mergeablePrev;
         private boolean curHead;
         private boolean nextHead;
         private boolean prevHead;
@@ -124,6 +121,11 @@
         protected boolean isNodeRandomHead(PositionWritable nodeID) {
             // "deterministically random", based on node id
             randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+
+            // similar hashcodes will produce similar initial random values.  Burn through a few to increase spread
+            for (int i = 0; i < 100; i++) {
+                randGenerator.nextFloat();
+            }
             return randGenerator.nextFloat() < probBeingRandomHead;
         }
 
@@ -171,7 +173,7 @@
             inFlag = value.getFlag();
             curNode.set(value.getNode());
             curID.set(curNode.getNodeID());
-
+            mergeDir = MergeDir.NO_MERGE; // no merge to happen
             headFlag = (byte) (MessageFlag.IS_HEAD & inFlag);
             tailFlag = (byte) (MessageFlag.IS_TAIL & inFlag);
             mergeMsgFlag = (byte) (headFlag | tailFlag);
@@ -179,41 +181,41 @@
             curHead = isNodeRandomHead(curID);
             // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
             // We prevent merging towards non-path nodes
-            hasNext = setNextInfo(curNode) && tailFlag == 0;
-            hasPrev = setPrevInfo(curNode) && headFlag == 0;
-            mergeDir = MergeDir.NO_MERGE; // no merge to happen
+            boolean isPath = curNode.isSimpleOrTerminalPath();
+            mergeableNext = setNextInfo(curNode) && tailFlag == 0;
+            mergeablePrev = setPrevInfo(curNode) && headFlag == 0;
 
             // decide where we're going to merge to
-            if (hasNext || hasPrev) {
+            if (isPath && (mergeableNext || mergeablePrev)) {
                 if (curHead) {
-                    if (hasNext && !nextHead) {
+                    if (mergeableNext && !nextHead) {
                         // merge forward
                         mergeMsgFlag |= nextDir;
                         mergeDir = MergeDir.FORWARD;
-                    } else if (hasPrev && !prevHead) {
+                    } else if (mergeablePrev && !prevHead) {
                         // merge backwards
                         mergeMsgFlag |= prevDir;
                         mergeDir = MergeDir.BACKWARD;
                     }
                 } else {
                     // I'm a tail
-                    if (hasNext && hasPrev) {
-                        if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+                    if (mergeableNext && mergeablePrev) {
+                        if ((!nextHead && !prevHead) && (curID.compareTo(nextID) > 0 && curID.compareTo(prevID) > 0)) {
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
                             mergeMsgFlag |= nextDir;
                             mergeDir = MergeDir.FORWARD;
                         }
-                    } else if (!hasPrev) {
+                    } else if (!mergeablePrev) {
                         // no previous node
-                        if (!nextHead && curID.compareTo(nextID) < 0) {
+                        if (!nextHead && curID.compareTo(nextID) > 0) {
                             // merge towards tail in forward dir
                             mergeMsgFlag |= nextDir;
                             mergeDir = MergeDir.FORWARD;
                         }
-                    } else if (!hasNext) {
+                    } else if (!mergeableNext) {
                         // no next node
-                        if (!prevHead && curID.compareTo(prevID) < 0) {
+                        if (!prevHead && curID.compareTo(prevID) > 0) {
                             // merge towards tail in reverse dir
                             mergeMsgFlag |= prevDir;
                             mergeDir = MergeDir.BACKWARD;
@@ -287,10 +289,8 @@
         private int KMER_SIZE;
         private NodeWithFlagWritable inputValue;
         private NodeWithFlagWritable outputValue;
-        private NodeWritable curNode;
         private PositionWritable outPosn;
         private boolean sawCurNode;
-        private byte outFlag;
         private byte inFlag;
 
         // to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
@@ -302,19 +302,26 @@
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             inputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputValue = new NodeWithFlagWritable(KMER_SIZE);
-            curNode = new NodeWritable(KMER_SIZE);
             outPosn = new PositionWritable();
             updateMsgs = new ArrayList<NodeWithFlagWritable>();
             updateMsgsSize = updateMsgs.size();
         }
 
+        private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
+            updateMsgsCount++;
+            if (updateMsgsCount >= updateMsgsSize) {
+                updateMsgs.add(new NodeWithFlagWritable(myInputValue)); // make a copy of inputValue-- not a reference!
+            } else {
+                updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
+            }
+        }
+
         /*
          * Process updates from mapper
          * 
          * (non-Javadoc)
          * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
          */
-        @SuppressWarnings("unchecked")
         @Override
         public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
                 OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
@@ -326,22 +333,23 @@
                 inputValue.set(values.next());
                 inFlag = inputValue.getFlag();
                 inMsg = (byte) (inFlag & MessageFlag.MSG_MASK);
-                
+
                 switch (inMsg) {
                     case MessageFlag.MSG_UPDATE_MERGE:
                     case MessageFlag.MSG_SELF:
                         if (sawCurNode)
-                            throw new IOException("Saw more than one MSG_SELF! previously seen self: " + curNode
-                                    + "  current self: " + inputValue.getNode());
-                        curNode.set(inputValue.getNode());
-                        outFlag = inFlag;
-                        sawCurNode = true;
+                            throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+                                    + outputValue.getNode() + "  current self: " + inputValue.getNode());
                         if (inMsg == MessageFlag.MSG_SELF) {
-                            outPosn.set(curNode.getNodeID());
-                        } else { // MSG_UPDATE_MERGE
+                            outPosn.set(outputValue.getNode().getNodeID());
+                        } else if (inMsg == MessageFlag.MSG_UPDATE_MERGE) {
                             // merge messages are sent to their merge recipient
-                            outPosn.set(curNode.getListFromDir(inMsg).getPosition(0));
+                            outPosn.set(outputValue.getNode().getListFromDir(inMsg).getPosition(0));
+                        } else {
+                            throw new IOException("Unrecongized MessageFlag MSG: " + inMsg);
                         }
+                        outputValue.set(inFlag, inputValue.getNode());
+                        sawCurNode = true;
                         break;
                     case MessageFlag.MSG_UPDATE_EDGE:
                         addUpdateMessage(inputValue);
@@ -355,196 +363,111 @@
             }
 
             // process all the update messages for this node
-            // I have no idea how to make this more efficient...
-            for (int i=0; i < updateMsgsCount; i++) {
-                NodeWithFlagWritable.processUpdates(curNode, updateMsgs.get(i), KMER_SIZE);
+            for (int i = 0; i < updateMsgsCount; i++) {
+                outputValue.processUpdates(updateMsgs.get(i), KMER_SIZE);
             }
-            outputValue.set(outFlag, curNode);
             output.collect(outPosn, outputValue);
         }
-
-        private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
-            updateMsgsCount++;
-            if (updateMsgsCount >= updateMsgsSize) {
-                updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
-            } else {
-                updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
-            }
-        }
     }
 
     /*
-     * Mapper class: sends the update messages to their (already decided) destination
+     * Reducer class: processes merge messages 
      */
-    public static class H4MergeMapper extends MapReduceBase implements
-            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
-        private static long randSeed;
-        private Random randGenerator;
-        private float probBeingRandomHead;
-
-        private int KMER_SIZE;
-        private NodeWithFlagWritable outputValue;
-        private NodeWithFlagWritable mergeMsgValue;
-        private NodeWithFlagWritable updateMsgValue;
-
-        private NodeWritable curNode;
-        private PositionWritable curID;
-        private PositionWritable nextID;
-        private PositionWritable prevID;
-        private boolean hasNext;
-        private boolean hasPrev;
-        private boolean curHead;
-        private boolean nextHead;
-        private boolean prevHead;
-        private MergeDir mergeDir;
-        private byte inFlag;
-        private byte headFlag;
-        private byte tailFlag;
-        private byte mergeMsgFlag;
-        private byte nextDir;
-        private byte prevDir;
-
-        public void configure(JobConf conf) {
-
-            randSeed = conf.getLong("randomSeed", 0);
-            randGenerator = new Random(randSeed);
-            probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
-
-            KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new NodeWithFlagWritable(KMER_SIZE);
-
-            mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
-            updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
-
-            curNode = new NodeWritable(KMER_SIZE);
-            curID = new PositionWritable();
-            nextID = new PositionWritable();
-            prevID = new PositionWritable();
-        }
-
-        @Override
-        public void map(PositionWritable key, NodeWithFlagWritable value,
-                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
-            inFlag = value.getFlag();
-            curNode.set(value.getNode());
-            curID.set(curNode.getNodeID());
-
-        }
-
-    }
-
-    /*
-     * Reducer class: processes the update messages from updateMapper
-     */
-    private static class H4MergeReducer2 extends MapReduceBase implements
+    private static class H4MergeReducer extends MapReduceBase implements
             Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private MultipleOutputs mos;
-        private static final String TO_MERGE_OUTPUT = "toMerge";
-        private static final String COMPLETE_OUTPUT = "complete";
-        private static final String UPDATES_OUTPUT = "update";
-        private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
-        private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
-        private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
+        public static final String TO_UPDATE_OUTPUT = "toUpdate";
+        public static final String COMPLETE_OUTPUT = "complete";
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
+        private OutputCollector<NodeWritable, NullWritable> completeCollector;
 
         private int KMER_SIZE;
         private NodeWithFlagWritable inputValue;
         private NodeWithFlagWritable outputValue;
-        private NodeWritable curNode;
-        private NodeWritable prevNode;
-        private NodeWritable nextNode;
+        private PositionWritable outputKey;
         private boolean sawCurNode;
-        private boolean sawPrevNode;
-        private boolean sawNextNode;
-        private int count;
-        private byte outFlag;
+        private byte inFlag;
+
+        // to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
+        private ArrayList<NodeWithFlagWritable> mergeMsgs;
+        private int updateMsgsSize;
+        private int mergeMsgsCount;
 
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             inputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputValue = new NodeWithFlagWritable(KMER_SIZE);
-            curNode = new NodeWritable(KMER_SIZE);
-            prevNode = new NodeWritable(KMER_SIZE);
-            nextNode = new NodeWritable(KMER_SIZE);
+            outputKey = new PositionWritable();
+            mergeMsgs = new ArrayList<NodeWithFlagWritable>();
+            updateMsgsSize = mergeMsgs.size();
         }
 
+        private void addMergeMessage(NodeWithFlagWritable myInputValue) {
+            mergeMsgsCount++;
+            if (mergeMsgsCount >= updateMsgsSize) {
+                mergeMsgs.add(new NodeWithFlagWritable(myInputValue)); // make a copy of inputValue-- not a reference!
+            } else {
+                mergeMsgs.get(mergeMsgsCount - 1).set(myInputValue); // update existing reference
+            }
+        }
+
+        /*
+         * Process merges
+         * 
+         * (non-Javadoc)
+         * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+         */
         @SuppressWarnings("unchecked")
         @Override
         public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
-                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
-            toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+                OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
+                throws IOException {
+            toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
             completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
-            updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
+            sawCurNode = false;
+            mergeMsgsCount = 0;
 
-            inputValue.set(values.next());
-            if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
-                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0
-                            && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
-                        // complete path (H & T meet in this node)
-                        completeCollector.collect(key, inputValue);
-                    } else {
-                        // FROM_SELF => no merging this round. remap self
-                        toMergeCollector.collect(key, inputValue);
-                    }
-                } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
-                    // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton?  error here!
-                    throw new IOException("Only one value recieved in merge, but it wasn't from self!");
-                }
-            } else {
-                // multiple inputs => a merge will take place. Aggregate all, then collect the merged path
-                count = 0;
-                outFlag = MessageFlag.EMPTY_MESSAGE;
-                sawCurNode = false;
-                sawPrevNode = false;
-                sawNextNode = false;
-                while (true) { // process values; break when no more
-                    count++;
-                    outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
-                    if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
-                        prevNode.set(inputValue.getNode());
-                        sawPrevNode = true;
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
-                        nextNode.set(inputValue.getNode());
-                        sawNextNode = true;
-                    } else if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
-                        curNode.set(inputValue.getNode());
+            while (values.hasNext()) {
+                inputValue.set(values.next());
+                inFlag = inputValue.getFlag();
+                switch (inFlag & MessageFlag.MSG_MASK) {
+                    case MessageFlag.MSG_SELF:
+                        if (sawCurNode)
+                            throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+                                    + outputValue.getNode() + "  current self: " + inputValue.getNode());
+                        outputKey.set(outputValue.getNode().getNodeID());
+                        outputValue.set(inFlag, inputValue.getNode());
                         sawCurNode = true;
-                    } else {
-                        throw new IOException("Unknown origin for merging node");
-                    }
-                    if (!values.hasNext()) {
                         break;
-                    } else {
-                        inputValue.set(values.next());
-                    }
+                    case MessageFlag.MSG_UPDATE_MERGE:
+                        addMergeMessage(inputValue);
+                        break;
+                    case MessageFlag.MSG_UPDATE_EDGE:
+                        throw new IOException("Error: update message recieved during merge phase!" + inputValue);
+                    default:
+                        throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
                 }
-                if (count != 2 && count != 3) {
-                    throw new IOException("Expected two or three nodes in MergePathsH4 reduce; saw "
-                            + String.valueOf(count));
-                }
-                if (!sawCurNode) {
-                    throw new IOException("Didn't see node from self in MergePathsH4 reduce!");
-                }
+            }
+            if (!sawCurNode) {
+                throw new IOException("Never saw self in recieve update messages!");
+            }
 
-                // merge any received nodes
-                if (sawNextNode) {
-                    curNode.mergeForwardNext(nextNode, KMER_SIZE);
-                    reporter.incrCounter("genomix", "num_merged", 1);
-                }
-                if (sawPrevNode) {
-                    // TODO: fix this merge command!  which one is the right one?
-                    curNode.mergeForwardPre(prevNode, KMER_SIZE);
-                    reporter.incrCounter("genomix", "num_merged", 1);
-                }
+            // process all the merge messages for this node
+            for (int i = 0; i < mergeMsgsCount; i++) {
+                outputValue.processUpdates(mergeMsgs.get(i), KMER_SIZE);
+            }
 
-                outputValue.set(outFlag, curNode);
-                if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
-                    // True heads meeting tails => merge is complete for this node
-                    completeCollector.collect(key, outputValue);
-                } else {
-                    toMergeCollector.collect(key, outputValue);
-                }
+            if (!outputValue.getNode().isSimpleOrTerminalPath()) {
+                // not a mergeable path, can't tell if it still needs updates!
+                toUpdateCollector.collect(outputKey, outputValue);
+            } else if ((outputValue.getFlag() & MessageFlag.IS_HEAD) > 0
+                    && ((outputValue.getFlag() & MessageFlag.IS_TAIL) > 0)) {
+                // H + T indicates a complete path
+                completeCollector.collect(outputValue.getNode(), NullWritable.get());
+            } else {
+                // not finished merging yet
+                toMergeCollector.collect(outputKey, outputValue);
             }
         }
 
@@ -556,11 +479,13 @@
     /*
      * Run one iteration of the mergePaths algorithm
      */
-    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput,
-            JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String toUpdateOutput, String completeOutput, JobConf baseConf)
+            throws IOException {
         JobConf conf = new JobConf(baseConf);
+        FileSystem dfs = FileSystem.get(conf);
         conf.setJarByClass(MergePathsH4.class);
         conf.setJobName("MergePathsH4 " + inputPath);
+<<<<<<< HEAD
         
         //another comment
 
@@ -568,45 +493,51 @@
         Path outputPath = new Path(inputPath + ".h4merge.tmp");
         FileOutputFormat.setOutputPath(conf, outputPath);
 
+=======
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
         conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setOutputFormat(NullOutputFormat.class);
-
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
         conf.setMapOutputKeyClass(PositionWritable.class);
         conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
         conf.setOutputValueClass(NodeWithFlagWritable.class);
 
+        // step 1: decide merge dir and send updates
+        FileInputFormat.addInputPaths(conf, inputPath);
+        String outputUpdatesTmp = "h4.updatesProcessed." + new Random().nextDouble() + ".tmp"; // random filename
+        FileOutputFormat.setOutputPath(conf, new Path(outputUpdatesTmp));
+        dfs.delete(new Path(outputUpdatesTmp), true);
         conf.setMapperClass(H4UpdatesMapper.class);
         conf.setReducerClass(H4UpdatesReducer.class);
-
-        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, NodeWithFlagWritable.class);
-        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, NodeWithFlagWritable.class);
-        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, NodeWithFlagWritable.class);
-
-        FileSystem dfs = FileSystem.get(conf);
-        // clean output dirs
-        dfs.delete(outputPath, true);
-        dfs.delete(new Path(toMergeOutput), true);
-        dfs.delete(new Path(completeOutput), true);
-        dfs.delete(new Path(updatesOutput), true);
-
         RunningJob job = JobClient.runJob(conf);
 
-        // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
-        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.TO_MERGE_OUTPUT), new Path(
-                toMergeOutput))) {
-            dfs.mkdirs(new Path(toMergeOutput));
+        // step 2: process merges
+        FileInputFormat.addInputPaths(conf, outputUpdatesTmp);
+        for (Path out : FileInputFormat.getInputPaths(conf)) {
+            System.out.println(out);
         }
-        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.COMPLETE_OUTPUT), new Path(
+        Path outputMergeTmp = new Path("h4.mergeProcessed." + new Random().nextDouble() + ".tmp"); // random filename
+        FileOutputFormat.setOutputPath(conf, outputMergeTmp);
+        MultipleOutputs.addNamedOutput(conf, H4MergeReducer.TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, NodeWithFlagWritable.class);
+        MultipleOutputs.addNamedOutput(conf, H4MergeReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                NodeWritable.class, NullWritable.class);
+        dfs.delete(outputMergeTmp, true);
+        conf.setMapperClass(IdentityMapper.class);
+        conf.setReducerClass(H4MergeReducer.class);
+        job = JobClient.runJob(conf);
+
+        // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
+        if (!dfs.rename(new Path(outputMergeTmp + File.separator + H4MergeReducer.TO_UPDATE_OUTPUT), new Path(
+                toUpdateOutput))) {
+            dfs.mkdirs(new Path(toUpdateOutput));
+        }
+        if (!dfs.rename(new Path(outputMergeTmp + File.separator + H4MergeReducer.COMPLETE_OUTPUT), new Path(
                 completeOutput))) {
             dfs.mkdirs(new Path(completeOutput));
         }
-        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.UPDATES_OUTPUT),
-                new Path(updatesOutput))) {
-            dfs.mkdirs(new Path(updatesOutput));
+        if (!dfs.rename(outputMergeTmp, new Path(toMergeOutput))) {
+            dfs.mkdirs(new Path(toMergeOutput));
         }
 
         return job;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index a3ea666..2765920 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -25,32 +26,30 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
+import edu.uci.ics.genomix.hadoop.pmcommon.ConvertGraphFromNodeWithFlagToNodeWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
 
 @SuppressWarnings("deprecation")
 public class MergePathsH4Driver {
 
     private static final String TO_MERGE = "toMerge";
+    private static final String TO_UPDATE = "toUpdate";
     private static final String COMPLETE = "complete";
-    private static final String UPDATES = "updates";
     private String mergeOutput;
+    private String toUpdateOutput;
     private String completeOutput;
-    private String updatesOutput;
 
     private void setOutputPaths(String basePath, int mergeIteration) {
         basePath = basePath.replaceAll("/$", ""); // strip trailing slash
         mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
+        toUpdateOutput = basePath + "_" + TO_UPDATE + "_i" + mergeIteration;
         completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
-        updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
     }
 
     private static class Options {
         @Option(name = "-inputpath", usage = "the input path", required = true)
         public String inputPath;
 
-        @Option(name = "-outputpath", usage = "the output path", required = true)
-        public String outputPath;
-
         @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
         public String mergeResultPath;
 
@@ -74,8 +73,8 @@
      * iterations of path merging. Updates during the merge are batch-processed
      * at the end in a final update job.
      */
-    public void run(String inputGraphPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
-            String defaultConfPath, JobConf defaultConf) throws IOException {
+    public String run(String inputGraphPath, int numReducers, int sizeKmer, int mergeRound, String defaultConfPath,
+            JobConf defaultConf) throws IOException {
         JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
         if (defaultConfPath != null) {
             baseConf.addResource(new Path(defaultConfPath));
@@ -85,31 +84,35 @@
         FileSystem dfs = FileSystem.get(baseConf);
 
         int iMerge = 0;
+        boolean mergeComplete = false;
+        String prevToMergeOutput = inputGraphPath;
+        ArrayList<String> completeOutputs = new ArrayList<String>();
 
         // identify head and tail nodes with pathnode initial
         PathNodeInitial inith4 = new PathNodeInitial();
         setOutputPaths(inputGraphPath, iMerge);
-        String prevToMergeOutput = inputGraphPath;
-        System.out.println("initial run.  toMerge: " + mergeOutput + ", complete: " + completeOutput);
-        inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
-        dfs.copyToLocalFile(new Path(mergeOutput), new Path("initial-toMerge"));
-        dfs.copyToLocalFile(new Path(completeOutput), new Path("initial-complete"));
+        inith4.run(prevToMergeOutput, mergeOutput, toUpdateOutput, completeOutput, baseConf);
+        completeOutputs.add(completeOutput);
+        //        dfs.copyToLocalFile(new Path(mergeOutput), new Path("initial-toMerge"));
+        //        dfs.copyToLocalFile(new Path(completeOutput), new Path("initial-complete"));
 
         // several iterations of merging
         MergePathsH4 merger = new MergePathsH4();
         for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
             prevToMergeOutput = mergeOutput;
             setOutputPaths(inputGraphPath, iMerge);
-            merger.run(prevToMergeOutput, mergeOutput, completeOutput, updatesOutput, baseConf);
-//            dfs.copyToLocalFile(new Path(mergeOutput), new Path("i" + iMerge +"-toMerge"));
-//            dfs.copyToLocalFile(new Path(completeOutput), new Path("i" + iMerge +"-complete"));
-//            dfs.copyToLocalFile(new Path(updatesOutput), new Path("i" + iMerge +"-updates"));
-            
+            merger.run(prevToMergeOutput, mergeOutput, toUpdateOutput, completeOutput, baseConf);
+            completeOutputs.add(completeOutput);
+            //            dfs.copyToLocalFile(new Path(mergeOutput), new Path("i" + iMerge +"-toMerge"));
+            //            dfs.copyToLocalFile(new Path(completeOutput), new Path("i" + iMerge +"-complete"));
+
             if (dfs.listStatus(new Path(mergeOutput)) == null || dfs.listStatus(new Path(mergeOutput)).length == 0) {
                 // no output from previous run-- we are done!
+                mergeComplete = true;
                 break;
             }
         }
+<<<<<<< HEAD
         
         // finally, combine all the completed paths and update messages to
         // create a single merged graph output
@@ -130,25 +133,34 @@
         };
         // test comment 
         
+=======
+        if (!mergeComplete) {
+            // if the merge didn't finish, we have to do one final iteration to convert back into (NodeWritable, NullWritable) pairs
+            ConvertGraphFromNodeWithFlagToNodeWritable converter = new ConvertGraphFromNodeWithFlagToNodeWritable();
+            converter.run(prevToMergeOutput, completeOutput, baseConf);
+            completeOutputs.add(completeOutput);
+        }
+
+        // final output string is a comma-separated list of completeOutputs
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
         StringBuilder sb = new StringBuilder();
         String delim = "";
-        for (FileStatus file : dfs.globStatus(new Path(inputGraphPath.replaceAll("/$",  "") + "*"), updateFilter)) {
-            sb.append(delim).append(file.getPath());
+        for (String output : completeOutputs) {
+            sb.append(delim).append(output);
             delim = ",";
         }
         String finalInputs = sb.toString();
-        System.out.println("This is the final sacrifice: " + finalInputs);
-        // TODO run the update iteration
+        return finalInputs;
     }
 
-    public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
-            String defaultConfPath) throws IOException {
-        run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
+    public String run(String inputPath, int numReducers, int sizeKmer, int mergeRound, String defaultConfPath)
+            throws IOException {
+        return run(inputPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
     }
 
-    public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
-            JobConf defaultConf) throws IOException {
-        run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
+    public String run(String inputPath, int numReducers, int sizeKmer, int mergeRound, JobConf defaultConf)
+            throws IOException {
+        return run(inputPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
     }
 
     public static void main(String[] args) throws Exception {
@@ -156,7 +168,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
         MergePathsH4Driver driver = new MergePathsH4Driver();
-        driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, options.mergeRound,
-                null, null);
+        String outputs = driver.run(options.inputPath, options.numReducers, options.sizeKmer, options.mergeRound, null, null);
+        System.out.println("Job ran.  Find outputs in " + outputs);
     }
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/ConvertGraphFromNodeWithFlagToNodeWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/ConvertGraphFromNodeWithFlagToNodeWritable.java
new file mode 100644
index 0000000..a060d5f
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/ConvertGraphFromNodeWithFlagToNodeWritable.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+/*
+ * Convert the graph from (PositionWritable, NodeWritableWithFlag) to (NodeWritable, NullWritable) 
+ */
+@SuppressWarnings("deprecation")
+public class ConvertGraphFromNodeWithFlagToNodeWritable extends Configured implements Tool {
+
+    public static class ConvertGraphMapper extends MapReduceBase implements
+            Mapper<PositionWritable, NodeWithFlagWritable, NodeWritable, NullWritable> {
+
+        /*
+         * Convert the graph
+         */
+        @Override
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
+            output.collect(value.getNode(), NullWritable.get());
+        }
+    }
+
+    /*
+     * Convert the graph
+     */
+    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+        JobConf conf = new JobConf(baseConf);
+        conf.setJarByClass(ConvertGraphFromNodeWithFlagToNodeWritable.class);
+        conf.setJobName("Convert graph to NodeWritable " + inputPath);
+        conf.setMapOutputKeyClass(NodeWritable.class);
+        conf.setMapOutputValueClass(NullWritable.class);
+        conf.setOutputKeyClass(NodeWritable.class);
+        conf.setOutputValueClass(NullWritable.class);
+        
+        conf.setInputFormat(SequenceFileInputFormat.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        FileInputFormat.addInputPaths(conf, inputPath);
+        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+        FileSystem dfs = FileSystem.get(conf);
+        dfs.delete(new Path(outputPath), true); // clean output dir
+        
+        conf.setMapperClass(ConvertGraphMapper.class);
+//        conf.setReducerClass(PathNodeInitialReducer.class);
+        conf.setNumReduceTasks(0);
+        RunningJob job = JobClient.runJob(conf);
+
+        return job;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        int res = ToolRunner.run(new Configuration(), new ConvertGraphFromNodeWithFlagToNodeWritable(), args);
+        return res;
+    }
+
+    public static void main(String[] args) throws Exception {
+        int res = new ConvertGraphFromNodeWithFlagToNodeWritable().run(args);
+        System.exit(res);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 4148982..3af7069 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -43,7 +43,7 @@
         byte neighborToMeDir = mirrorDirection(neighborDir);
         byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
 
-        // clear previous kmer and edge data 
+        // clear previous kmer and edge data
         node.reset(0);
 
         // indicate the node to delete
@@ -73,8 +73,9 @@
     }
 
     /*
-     * When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge, 
-     * returns the new edge type for A->C
+     * When A->B edge type is @neighborDir and B will merge towards C along a
+     * 
+     * @mergeDir edge, returns the new edge type for A->C
      */
     public static byte flipDirection(byte neighborDir, byte mergeDir) {
         switch (mergeDir) {
@@ -106,10 +107,10 @@
     }
 
     /*
-     * Process any changes to @node contained in @updateMsg.  This includes merges and edge updates
+     * Process any changes to @self contained in @updateMsg. This includes
+     * merges and edge updates.
      */
-    public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize)
-            throws IOException {
+    public void processUpdates(NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
         byte updateFlag = updateMsg.getFlag();
         NodeWritable updateNode = updateMsg.getNode();
         
@@ -131,7 +132,25 @@
             node.getListFromDir(updateFlag).remove(updateNode.getNodeID()); // remove the node from my edges
             node.getKmer().mergeWithKmerInDir(updateFlag, kmerSize, updateNode.getKmer()); // merge with its kmer
 
-            // merge my edges with the incoming node's edges, accounting for if the node flipped in 
+            // pass along H/T information from the merging node. flipping H ->T, T -> H
+            switch (updateFlag & MessageFlag.DIR_MASK) {
+                case MessageFlag.DIR_FF:
+                case MessageFlag.DIR_RR:
+                    flag |= (byte) (updateFlag & MessageFlag.IS_HEAD);
+                    flag |= (byte) (updateFlag & MessageFlag.IS_TAIL);
+                    break;
+                case MessageFlag.DIR_FR:
+                case MessageFlag.DIR_RF:
+                    if ((updateFlag & MessageFlag.IS_HEAD) > 0)
+                        flag |= (byte) (updateFlag & MessageFlag.IS_TAIL);
+                    if ((updateFlag & MessageFlag.IS_TAIL) > 0)
+                        flag |= (byte) (updateFlag & MessageFlag.IS_HEAD);
+                    break;
+                default:
+                    throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+            }
+
+            // merge my edges with the incoming node's edges, accounting for if the node flipped in
             // the merge and if it's my predecessor or successor
             switch (updateFlag & MessageFlag.DIR_MASK) {
                 case MessageFlag.DIR_FF:
@@ -140,27 +159,31 @@
                     node.getFRList().set(updateNode.getFRList());
                     // update isn't allowed to have any other successors (mirror & flip)
                     if (updateNode.getRFList().getCountOfPosition() > 0)
-                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards "
+                                + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 case MessageFlag.DIR_FR:
                     // flip edges
                     node.getFFList().set(updateNode.getRFList());
                     node.getFRList().set(updateNode.getRRList());
                     if (updateNode.getFFList().getCountOfPosition() > 0)
-                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards "
+                                + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 case MessageFlag.DIR_RF:
                     // flip edges
                     node.getRFList().set(updateNode.getFFList());
                     node.getRRList().set(updateNode.getFRList());
                     if (updateNode.getRRList().getCountOfPosition() > 0)
-                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards "
+                                + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 case MessageFlag.DIR_RR:
                     node.getRFList().set(updateNode.getRFList());
                     node.getRRList().set(updateNode.getRRList());
                     if (updateNode.getFRList().getCountOfPosition() > 0)
-                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards "
+                                + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 default:
                     throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
@@ -213,10 +236,7 @@
     }
 
     public NodeWritable getNode() {
-        if (node.getCount() != 0) {
-            return node;
-        }
-        return null;
+        return node;
     }
 
     public byte getFlag() {
@@ -246,7 +266,7 @@
 
     @Override
     public int hashCode() {
-        //        return super.hashCode() + flag + node.hashCode();
+        // return super.hashCode() + flag + node.hashCode();
         return flag + node.hashCode();
     }
 
@@ -258,4 +278,8 @@
         }
         return false;
     }
+
+    public void setNode(NodeWritable otherNode) {
+        node.set(otherNode);
+    }
 }
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 3c46dc7..d9b7763 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -34,8 +34,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -56,7 +56,6 @@
 public class PathNodeInitial extends Configured implements Tool {
 
     public static final String COMPLETE_OUTPUT = "complete";
-    public static final String TO_MERGE_OUTPUT = "toMerge";
     public static final String TO_UPDATE_OUTPUT = "toUpdate";
 
     private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
@@ -89,7 +88,6 @@
             Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
-        private PositionWritable outputKey;
         private NodeWithFlagWritable outputValue;
         private int inDegree;
         private int outDegree;
@@ -98,7 +96,6 @@
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             outputValue = new NodeWithFlagWritable(KMER_SIZE);
-            outputKey = new PositionWritable();
         }
 
         /*
@@ -158,8 +155,8 @@
     public static class PathNodeInitialReducer extends MapReduceBase implements
             Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private MultipleOutputs mos;
-        private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
         private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
+        private OutputCollector<NodeWritable, NullWritable> completeCollector;
         private int KMER_SIZE;
 
         private NodeWithFlagWritable inputValue;
@@ -169,6 +166,7 @@
         private byte inputFlag;
         private boolean sawSelf;
 
+        @Override
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
@@ -177,6 +175,11 @@
             nodeToKeep = new NodeWritable(KMER_SIZE);
         }
 
+        @Override
+        public void close() throws IOException {
+            mos.close();
+        }
+
         /*
          * Segregate nodes into three bins:
          *   1. mergeable nodes (maybe marked H or T)
@@ -191,13 +194,15 @@
         public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
                 OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
                 throws IOException {
-            completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
             toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
+            completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
 
             outputFlag = MessageFlag.EMPTY_MESSAGE;
             sawSelf = false;
             while (values.hasNext()) {
-                inputValue.set(values.next());
+                NodeWithFlagWritable next = values.next();
+                System.out.println(next);
+                inputValue.set(next);
                 inputFlag = inputValue.getFlag();
                 outputFlag |= inputFlag;
 
@@ -211,19 +216,23 @@
                     nodeToKeep.set(inputValue.getNode());
                 }
             }
+            if (!sawSelf) {
+                throw new IOException("Didn't see a self node in PathNodeInitial! flag: " + outputFlag);
+            }
 
-            if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
+            if (!nodeToKeep.isSimpleOrTerminalPath()) {
+                // non-path nodes need updates if adjacent to path nodes
+                if ((outputFlag & NEAR_PATH) > 0) {
+                    outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+                    toUpdateCollector.collect(key, outputValue);
+                } else {
+                    // not adjacent... store in "complete" output
+                    completeCollector.collect(nodeToKeep, NullWritable.get());
+                }
+            } else {
                 if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
-                    // non-path or single path nodes
-                    if ((outputFlag & NEAR_PATH) > 0) {
-                        // non-path, but an update candidate
-                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
-                        toUpdateCollector.collect(key, outputValue);
-                    } else {
-                        // non-path or single-node path.  Store in "complete" output
-                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
-                        completeCollector.collect(key, outputValue);
-                    }
+                    // path node, but cannot merge in either direction => complete
+                    completeCollector.collect(nodeToKeep, NullWritable.get());
                 } else {
                     // path nodes that are mergeable
                     outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
@@ -238,54 +247,41 @@
                         reporter.incrCounter("genomix", "path_nodes_tails", 1);
                     }
                 }
-            } else {
-                throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
             }
         }
-
-        public void close() throws IOException {
-            mos.close();
-        }
     }
 
     /*
      * Mark the head, tail, and simple path nodes in one map-reduce job.
      */
-    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
+    public RunningJob run(String inputPath, String toMergeOutput, String toUpdateOutput, String completeOutput,
             JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(PathNodeInitial.class);
         conf.setJobName("PathNodeInitial " + inputPath);
-
-        FileInputFormat.addInputPaths(conf, inputPath);
-        //        Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
-        Path outputPath = new Path(toMergeOutput);
-        FileOutputFormat.setOutputPath(conf, outputPath);
-
-        conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setOutputFormat(NullOutputFormat.class);
-
         conf.setMapOutputKeyClass(PositionWritable.class);
         conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
         conf.setOutputValueClass(NodeWithFlagWritable.class);
 
-        conf.setMapperClass(PathNodeInitialMapper.class);
-        conf.setReducerClass(PathNodeInitialReducer.class);
-
-        MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-                PositionWritable.class, NodeWithFlagWritable.class);
+        conf.setInputFormat(SequenceFileInputFormat.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        FileInputFormat.addInputPaths(conf, inputPath);
+        FileOutputFormat.setOutputPath(conf, new Path(toMergeOutput));
         MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
                 PositionWritable.class, NodeWithFlagWritable.class);
-
+        MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class, NodeWritable.class,
+                NullWritable.class);
         FileSystem dfs = FileSystem.get(conf);
-        dfs.delete(outputPath, true); // clean output dir
+        dfs.delete(new Path(toMergeOutput), true); // clean output dir
+
+        conf.setMapperClass(PathNodeInitialMapper.class);
+        conf.setReducerClass(PathNodeInitialReducer.class);
         RunningJob job = JobClient.runJob(conf);
 
         // move the tmp outputs to the arg-spec'ed dirs
-        dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
-        dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
-        //        dfs.rename(outputPath, new Path(toMergeOutput));
+        dfs.rename(new Path(toMergeOutput + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+        dfs.rename(new Path(toMergeOutput + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
 
         return job;
     }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index c04ba46..7a65d59 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -16,7 +16,7 @@
 
 @SuppressWarnings("deprecation")
 public class TestPathMergeH3 extends GenomixMiniClusterTest {
-    protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+    protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/sequence/bubble_test1.txt";
     protected String HDFS_SEQUENCE = "/00-sequence/";
     protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
     protected String HDFS_MARKPATHS = "/02-pathmark/";
@@ -42,7 +42,7 @@
         buildGraph();
     }
 
-    @Test
+//    @Test
     public void TestMergeOneIteration() throws Exception {
         cleanUpOutput();
         if (regenerateGraph) {
@@ -53,14 +53,14 @@
             copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
         }
         
-        PathNodeInitial inith3 = new PathNodeInitial();
-        inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
-        copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
-        copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
-
-        MergePathsH3Driver h3 = new MergePathsH3Driver();
-        h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
-        copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
+//        PathNodeInitial inith3 = new PathNodeInitial();
+//        inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
+//        copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+//        copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+//
+//        MergePathsH3Driver h3 = new MergePathsH3Driver();
+//        h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
+//        copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
     }
 
 
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 45a2d0a..249af29 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -10,55 +10,69 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Test;
 
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3Driver;
-import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
 import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
 import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 
 @SuppressWarnings("deprecation")
 public class TestPathMergeH4 extends HadoopMiniClusterTest {
-    protected final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
-    protected final  String SEQUENCE = "/00-sequence/";
-    protected final String GRAPHBUILD = "/01-graphbuild/";
-    protected final String MERGED = "/02-pathmerge/";
-    
-    protected final String ACTUAL = "src/test/resources/actual/";
-    
-    protected final boolean regenerateGraph = true;
-    
+    protected String SEQUENCE = "/sequence/";
+    protected String GRAPHBUILD = "/graphbuild-unmerged/";
+    protected String MERGED = "/pathmerge/";
+    protected String LOCAL_SEQUENCE_FILE;
+    protected String readsFile;
+    protected boolean regenerateGraph;
     {
-        KMER_LENGTH = 5;
-        READ_LENGTH = 8;
         HDFS_PATHS = new ArrayList<String>(Arrays.asList(SEQUENCE, GRAPHBUILD, MERGED));
-        conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
-        conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
     }
-
+    
+    public void setupTestConf(int kmerLength, int readLength, boolean regenerateGraph, String readsFile) {
+        KMER_LENGTH = kmerLength;
+        READ_LENGTH = readLength;
+        this.readsFile = readsFile;
+        this.regenerateGraph = regenerateGraph;
+        LOCAL_SEQUENCE_FILE = DATA_ROOT + SEQUENCE + readsFile;
+    }
+    
     @Test
-    public void TestMergeOneIteration() throws Exception {
-        cleanUpOutput();
-        prepareGraph();
-        
-        MergePathsH4Driver h4 = new MergePathsH4Driver();
-        h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 5, conf);
-        copyResultsToLocal(MERGED, ACTUAL_ROOT + MERGED, false, conf);
+    public void testTwoReads() throws Exception {
+        setupTestConf(5, 8, false, "tworeads.txt");
+//        testPathNode();
+        testMergeOneIteration();
+//        testMergeToCompletion();
     }
-
+    
 //    @Test
+    public void testSimpleText() throws Exception {
+        setupTestConf(5, 8, false, "text.txt");
+        testPathNode();
+        testMergeOneIteration();
+//        testMergeToCompletion();
+    }
+    
     public void testPathNode() throws IOException {
         cleanUpOutput();
         prepareGraph();
 
         // identify head and tail nodes with pathnode initial
         PathNodeInitial inith4 = new PathNodeInitial();
-        inith4.run(GRAPHBUILD, "/toMerge", "/completed", conf);
+        inith4.run(GRAPHBUILD, "/toMerge", "/toUpdate", "/completed", conf);
+        copyResultsToLocal("/toMerge", ACTUAL_ROOT + "path_toMerge", false, conf);
+        copyResultsToLocal("/toUpdate", ACTUAL_ROOT + "path_toUpdate", false, conf);
+        copyResultsToLocal("/completed", ACTUAL_ROOT + "path_completed", false, conf);
     }
-    
-    
 
+    public void testMergeOneIteration() throws Exception {
+        cleanUpOutput();
+        prepareGraph();
+        
+        MergePathsH4Driver h4 = new MergePathsH4Driver();
+        String outputs = h4.run(GRAPHBUILD, 2, KMER_LENGTH, 1, conf);
+        int i=0;
+        for (String out : outputs.split(",")) {
+            copyResultsToLocal(out, ACTUAL_ROOT + MERGED + i++, false, conf);
+        }
+    }
 
     public void buildGraph() throws IOException {
         JobConf buildConf = new JobConf(conf);  // use a separate conf so we don't interfere with other jobs 
@@ -76,9 +90,9 @@
         if (regenerateGraph) {
             copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
             buildGraph();
-            copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+            copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + readsFile + ".binmerge", GRAPHBUILD);
         } else {
-            copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+            copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + readsFile + ".binmerge", GRAPHBUILD);
         }
     }
 }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
index 7f1e1d7..1f4a2d2 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -76,9 +76,9 @@
                     new Path(localDestFile), false, conf, null);
         } else {
             // file is binary
-            // save the entire binary output dir
-            FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
-                    new Path(localDestFile + ".bindir"), false, conf);
+//            // save the entire binary output dir
+//            FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+//                    new Path(localDestFile + ".bindir"), false, conf);
 
             // also load the Nodes and write them out as text locally. 
             FileSystem lfs = FileSystem.getLocal(new Configuration());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
index 9a113f1..89f9e96 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -40,8 +40,9 @@
     // subclass should modify this to include the HDFS directories that should be cleaned up
     protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
 
-    protected static String EXPECTED_ROOT = "src/test/resources/expected/";
-    protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+    protected static final String EXPECTED_ROOT = "src/test/resources/expected/";
+    protected static final String ACTUAL_ROOT = "src/test/resources/actual/";
+    protected static final String DATA_ROOT = "src/test/resources/data/";
 
     protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
     protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test.txt
new file mode 100644
index 0000000..b450bd5
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test.txt
@@ -0,0 +1,3 @@
+1	ACGTA

+2	AAATA

+

diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test2.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test2.txt
new file mode 100644
index 0000000..ee05672
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test2.txt
@@ -0,0 +1,4 @@
+1	AAAACGTAT

+2	GGGAATACG

+3	CGTATTCCC

+

diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/rf_test.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/rf_test.txt
new file mode 100644
index 0000000..154dc8c
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/rf_test.txt
@@ -0,0 +1,3 @@
+1	ACGGTGTA

+2	ACCGTGGT

+

diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/singleread.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/singleread.txt
new file mode 100644
index 0000000..63a95ad
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/singleread.txt
@@ -0,0 +1 @@
+1	AATAGAAG

diff --git a/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/text.txt
old mode 100755
new mode 100644
similarity index 100%
rename from genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
rename to genomix/genomix-hadoop/src/test/resources/data/sequence/text.txt
diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/tworeads.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/tworeads.txt
new file mode 100644
index 0000000..3e3bf7b
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/tworeads.txt
@@ -0,0 +1,2 @@
+1	AATAGAAG

+2	AGAAGCCC

diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-merged/tworeads.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-merged/tworeads.txt
new file mode 100644
index 0000000..a2988b8
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-merged/tworeads.txt
@@ -0,0 +1,6 @@
+((1,1)	[(1,3)]	[]	[]	[]	AATAGA)	(null)
+((1,3)	[(2,1),(1,4)]	[]	[]	[(1,1)]	TAGAA)	(null)
+((1,4)	[(2,2)]	[]	[]	[(1,3)]	AGAAG)	(null)
+((2,1)	[(2,2)]	[]	[]	[(1,3)]	AGAAG)	(null)
+((2,2)	[(2,3)]	[]	[]	[(1,4),(2,1)]	GAAGC)	(null)
+((2,3)	[]	[]	[]	[(2,2)]	AAGCCC)	(null)
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test.txt
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test2.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test2.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test2.txt
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/text.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/text.txt
new file mode 100644
index 0000000..036bbfb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/text.txt
@@ -0,0 +1,24 @@
+((2,1)	[(2,2)]	[]	[]	[]	AATAG)	(null)
+((2,2)	[(2,3)]	[]	[]	[(2,1)]	ATAGA)	(null)
+((2,3)	[(6,1),(2,4)]	[]	[]	[(2,2)]	TAGAA)	(null)
+((2,4)	[(6,2)]	[]	[]	[(2,3)]	AGAAG)	(null)
+((4,1)	[(4,2)]	[]	[]	[]	AATAG)	(null)
+((4,2)	[(4,3)]	[]	[]	[(4,1)]	ATAGA)	(null)
+((4,3)	[(6,1),(4,4)]	[]	[]	[(4,2)]	TAGAA)	(null)
+((4,4)	[(6,2)]	[]	[]	[(4,3)]	AGAAG)	(null)
+((6,1)	[(6,2)]	[]	[]	[(2,3),(1,3),(3,3),(4,3),(5,3)]	AGAAG)	(null)
+((6,2)	[(6,3)]	[]	[]	[(2,4),(3,4),(1,4),(4,4),(5,4),(6,1)]	GAAGA)	(null)
+((6,3)	[(6,4)]	[]	[]	[(6,2)]	AAGAA)	(null)
+((6,4)	[]	[]	[]	[(6,3)]	AGAAG)	(null)
+((1,1)	[(1,2)]	[]	[]	[]	AATAG)	(null)
+((1,2)	[(1,3)]	[]	[]	[(1,1)]	ATAGA)	(null)
+((1,3)	[(6,1),(1,4)]	[]	[]	[(1,2)]	TAGAA)	(null)
+((1,4)	[(6,2)]	[]	[]	[(1,3)]	AGAAG)	(null)
+((3,1)	[(3,2)]	[]	[]	[]	AATAG)	(null)
+((3,2)	[(3,3)]	[]	[]	[(3,1)]	ATAGA)	(null)
+((3,3)	[(6,1),(3,4)]	[]	[]	[(3,2)]	TAGAA)	(null)
+((3,4)	[(6,2)]	[]	[]	[(3,3)]	AGAAG)	(null)
+((5,1)	[(5,2)]	[]	[]	[]	AATAG)	(null)
+((5,2)	[(5,3)]	[]	[]	[(5,1)]	ATAGA)	(null)
+((5,3)	[(6,1),(5,4)]	[]	[]	[(5,2)]	TAGAA)	(null)
+((5,4)	[(6,2)]	[]	[]	[(5,3)]	AGAAG)	(null)
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/tworeads.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/tworeads.txt
new file mode 100644
index 0000000..036bbfb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/tworeads.txt
@@ -0,0 +1,24 @@
+((2,1)	[(2,2)]	[]	[]	[]	AATAG)	(null)
+((2,2)	[(2,3)]	[]	[]	[(2,1)]	ATAGA)	(null)
+((2,3)	[(6,1),(2,4)]	[]	[]	[(2,2)]	TAGAA)	(null)
+((2,4)	[(6,2)]	[]	[]	[(2,3)]	AGAAG)	(null)
+((4,1)	[(4,2)]	[]	[]	[]	AATAG)	(null)
+((4,2)	[(4,3)]	[]	[]	[(4,1)]	ATAGA)	(null)
+((4,3)	[(6,1),(4,4)]	[]	[]	[(4,2)]	TAGAA)	(null)
+((4,4)	[(6,2)]	[]	[]	[(4,3)]	AGAAG)	(null)
+((6,1)	[(6,2)]	[]	[]	[(2,3),(1,3),(3,3),(4,3),(5,3)]	AGAAG)	(null)
+((6,2)	[(6,3)]	[]	[]	[(2,4),(3,4),(1,4),(4,4),(5,4),(6,1)]	GAAGA)	(null)
+((6,3)	[(6,4)]	[]	[]	[(6,2)]	AAGAA)	(null)
+((6,4)	[]	[]	[]	[(6,3)]	AGAAG)	(null)
+((1,1)	[(1,2)]	[]	[]	[]	AATAG)	(null)
+((1,2)	[(1,3)]	[]	[]	[(1,1)]	ATAGA)	(null)
+((1,3)	[(6,1),(1,4)]	[]	[]	[(1,2)]	TAGAA)	(null)
+((1,4)	[(6,2)]	[]	[]	[(1,3)]	AGAAG)	(null)
+((3,1)	[(3,2)]	[]	[]	[]	AATAG)	(null)
+((3,2)	[(3,3)]	[]	[]	[(3,1)]	ATAGA)	(null)
+((3,3)	[(6,1),(3,4)]	[]	[]	[(3,2)]	TAGAA)	(null)
+((3,4)	[(6,2)]	[]	[]	[(3,3)]	AGAAG)	(null)
+((5,1)	[(5,2)]	[]	[]	[]	AATAG)	(null)
+((5,2)	[(5,3)]	[]	[]	[(5,1)]	ATAGA)	(null)
+((5,3)	[(6,1),(5,4)]	[]	[]	[(5,2)]	TAGAA)	(null)
+((5,4)	[(6,2)]	[]	[]	[(5,3)]	AGAAG)	(null)