Merge branch 'anbangx/fullstack_genomix' of https://code.google.com/p/hyracks into anbangx/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 416ab49..be65d55 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.genomix.type;
 
+import javax.management.RuntimeErrorException;
+
 public class GeneCode {
     public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
     /**
@@ -51,7 +53,7 @@
     
     public static byte getPairedGeneCode(byte genecode){
         if ( genecode < 0 || genecode > 3){
-            throw new IllegalArgumentException("Invalid genecode");
+            throw new IllegalArgumentException("Invalid genecode: " + genecode);
         }
         return (byte) (3- genecode);
     }
@@ -66,4 +68,30 @@
         }
         return GENE_SYMBOL[code];
     }
+    
+    public static String reverseComplement(String kmer) {
+        StringBuilder sb = new StringBuilder();
+        for (char letter : kmer.toCharArray()) {
+            sb.append(complement(letter));
+        }
+        return sb.reverse().toString();
+    }
+    
+    public static char complement(char ch) {
+        switch (ch) {
+            case 'A':
+            case 'a':
+                return 'T';
+            case 'C':
+            case 'c':
+                return 'G';
+            case 'G':
+            case 'g':
+                return 'C';
+            case 'T':
+            case 't':
+                return 'A';
+        }
+        throw new RuntimeException("Invalid character given in complement: " + ch);
+    }
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 50baeb4..17f1a11 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -50,7 +50,7 @@
     public KmerBytesWritable(int k, byte[] storage, int offset) {
         setNewReference(k, storage, offset);
     }
-    
+
     public KmerBytesWritable(int k, String kmer) {
         setNewReference(kmer.length(), kmer.getBytes(), 0);
     }
@@ -198,11 +198,16 @@
         if (pos >= kmerlength) {
             throw new IllegalArgumentException("gene position out of bound");
         }
+        return geneCodeAtPosition(pos);
+    }
+    
+    // unchecked version of above. Used when kmerlength is inaccurate (mid-merge)
+    private byte geneCodeAtPosition(int pos) {
         int posByte = pos / 4;
         int shift = (pos % 4) << 1;
         return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
     }
-
+    
     public int getKmerLength() {
         return this.kmerlength;
     }
@@ -267,7 +272,8 @@
         byte l = 0;
         int bytecount = 0;
         int bcount = size - 1;
-        for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
+//        for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
+        for (int i = start + kmerlength - 1; i >= start && i < array.length; i--) {
             byte code = GeneCode.getPairedCodeFromSymbol(array[i]);
             l |= (byte) (code << bytecount);
             bytecount += 2;
@@ -358,7 +364,7 @@
      * @param kmer
      *            : the next kmer
      */
-    public void mergeNextKmer(int initialKmerSize, KmerBytesWritable kmer) {
+    public void mergeWithFFKmer(int initialKmerSize, KmerBytesWritable kmer) {
         int preKmerLength = kmerlength;
         int preSize = size;
         this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
@@ -374,6 +380,97 @@
     }
 
     /**
+     * Merge Kmer with the next connected Kmer, when that Kmer needs to be reverse-complemented
+     * e.g. AAGCTAA merge with GGTTGTT, if the initial kmerSize = 3
+     * then it will return AAGCTAACAACC
+     * 
+     * A merge B =>  A B~
+     * 
+     * @param initialKmerSize
+     *            : the initial kmerSize
+     * @param kmer
+     *            : the next kmer
+     */
+    public void mergeWithFRKmer(int initialKmerSize, KmerBytesWritable kmer) {
+        int preSize = size;
+        int preKmerLength = kmerlength;
+        this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
+        setSize(KmerUtil.getByteNumFromK(kmerlength));
+        // copy prefix into right-side of buffer
+        for (int i = 1; i <= preSize; i++) {
+            bytes[offset + size - i] = bytes[offset + preSize - i];
+        }
+
+        int bytecount = (preKmerLength % 4) * 2;
+        int bcount = size - preSize - bytecount / 8; // may overlap previous kmer
+        byte l = bcount == size - preSize ? bytes[offset + bcount] : 0x00;
+        bytecount %= 8;
+        for (int i = kmer.kmerlength - initialKmerSize; i >= 0; i--) {
+            byte code = GeneCode.getPairedGeneCode(kmer.getGeneCodeAtPosition(i));
+            l |= (byte) (code << bytecount);
+            bytecount += 2;
+            if (bytecount == 8) {
+                bytes[offset + bcount--] = l;
+                l = 0;
+                bytecount = 0;
+            }
+        }
+        if (bcount >= 0) {
+            bytes[offset] = l;
+        }
+    }
+
+    /**
+     * Merge Kmer with the previous connected Kmer, when that kmer needs to be reverse-complemented
+     * e.g. AACAACC merge with TTCTGCC, if the initial kmerSize = 3
+     * then it will return GGCAGAACAACC
+     * 
+     * @param initialKmerSize
+     *            : the initial kmerSize
+     * @param preKmer
+     *            : the previous kmer
+     */
+    public void mergeWithRFKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+        int preKmerLength = kmerlength;
+        int preSize = size;
+        this.kmerlength += preKmer.kmerlength - initialKmerSize + 1;
+        setSize(KmerUtil.getByteNumFromK(kmerlength));
+        //        byte cacheByte = getOneByteFromKmerAtPosition(0, bytes, offset, preSize);
+
+        int byteIndex = size - 1;
+        byte cacheByte = 0x00;
+        int posnInByte = 0;
+
+        // copy rc of preKmer into high bytes
+        for (int i = preKmer.kmerlength - 1; i >= initialKmerSize - 1; i--) {
+            byte code = GeneCode.getPairedGeneCode(preKmer.getGeneCodeAtPosition(i));
+            cacheByte |= (byte) (code << posnInByte);
+            posnInByte += 2;
+            if (posnInByte == 8) {
+                bytes[byteIndex--] = cacheByte;
+                cacheByte = 0;
+                posnInByte = 0;
+            }
+        }
+        
+        // copy my kmer into low positions of bytes
+        for (int i = 0; i < preKmerLength; i++) {
+           // expanding the capacity makes this offset incorrect.  It's off by the # of additional bytes added.
+            int newposn = i + (size - preSize) * 4;
+            byte code = geneCodeAtPosition(newposn);
+            cacheByte |= (byte) (code << posnInByte);
+            posnInByte += 2;
+            if (posnInByte == 8) {
+                bytes[byteIndex--] = cacheByte;
+                cacheByte = 0;
+                posnInByte = 0;
+            }
+        }
+        bytes[offset] = cacheByte;
+        clearLeadBit();
+    }
+
+    /**
      * Merge Kmer with the previous connected Kmer
      * e.g. AACAACC merge with AAGCTAA, if the initial kmerSize = 3
      * then it will return AAGCTAACAACC
@@ -383,7 +480,7 @@
      * @param preKmer
      *            : the previous kmer
      */
-    public void mergePreKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+    public void mergeWithRRKmer(int initialKmerSize, KmerBytesWritable preKmer) {
         int preKmerLength = kmerlength;
         int preSize = size;
         this.kmerlength += preKmer.kmerlength - initialKmerSize + 1;
@@ -417,7 +514,7 @@
 
         buffer[position] = (byte) ((buffer[position] & mask) | ((0xff & onebyte) << shift));
         if (position > start && shift != 0) {
-            buffer[position - 1] = (byte) ((buffer[position - 1] & (0xff - mask)) | ((byte) ((0xff & onebyte) >> (8 - shift))));
+            buffer[position - 1] = (byte) ((buffer[position - 1] & (0xff - mask)) | ((byte) ((0xff & onebyte) >>> (8 - shift))));
         }
     }
 
@@ -427,7 +524,7 @@
             throw new IllegalArgumentException("Buffer of kmer storage is invalid");
         }
         int shift = (k % 4) << 1;
-        byte data = (byte) (((0xff) & buffer[position]) >> shift);
+        byte data = (byte) (((0xff) & buffer[position]) >>> shift);
         if (shift != 0 && position > start) {
             data |= 0xff & (buffer[position - 1] << (8 - shift));
         }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 54abda3..2c64139 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -27,6 +27,17 @@
      * 
      */
     private static final long serialVersionUID = 1L;
+    public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
+
+    // merge/update directions
+    public static class DirectionFlag {
+        public static final byte DIR_FF = 0b00 << 0;
+        public static final byte DIR_FR = 0b01 << 0;
+        public static final byte DIR_RF = 0b10 << 0;
+        public static final byte DIR_RR = 0b11 << 0;
+        public static final byte DIR_MASK = 0b11 << 0;
+    }
+
     private PositionWritable nodeID;
     private PositionListWritable forwardForwardList;
     private PositionListWritable forwardReverseList;
@@ -57,9 +68,9 @@
         reverseReverseList.set(RRList);
         kmer.set(kmer);
     }
-    
+
     public void set(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
-            PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer){
+            PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
         this.nodeID.set(nodeID);
         this.forwardForwardList.set(FFList);
         this.forwardReverseList.set(FRList);
@@ -105,6 +116,21 @@
         return reverseReverseList;
     }
 
+    public PositionListWritable getListFromDir(byte dir) {
+        switch (dir & DirectionFlag.DIR_MASK) {
+            case DirectionFlag.DIR_FF:
+                return getFFList();
+            case DirectionFlag.DIR_FR:
+                return getFRList();
+            case DirectionFlag.DIR_RF:
+                return getRFList();
+            case DirectionFlag.DIR_RR:
+                return getRRList();
+            default:
+                throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
+        }
+    }
+
     public PositionWritable getNodeID() {
         return nodeID;
     }
@@ -120,13 +146,13 @@
     public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
         this.forwardForwardList.set(nextNode.forwardForwardList);
         this.forwardReverseList.set(nextNode.forwardReverseList);
-        kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
+        kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
     }
 
     public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
         this.reverseForwardList.set(preNode.reverseForwardList);
         this.reverseReverseList.set(preNode.reverseReverseList);
-        kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
+        kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
     }
 
     public void set(NodeWritable node) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index ffd2805..c793609 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -131,6 +131,20 @@
         };
         return it;
     }
+    
+    /*
+     * remove the first instance of @toRemove. Uses a linear scan.  Throws an exception if not in this list.
+     */
+    public void remove(PositionWritable toRemove) {
+        Iterator<PositionWritable> posIterator = this.iterator();
+        while (posIterator.hasNext()) {
+            if(toRemove.equals(posIterator.next())) {
+                posIterator.remove();
+                return;
+            }
+        }
+        throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+    }
 
     public void set(PositionListWritable list2) {
         set(list2.valueCount, list2.storage, list2.offset);
@@ -162,6 +176,20 @@
         valueCount += 1;
     }
     
+    /*
+     * Append the otherList to the end of myList
+     */
+    public void appendList(PositionListWritable otherList) {
+        if (otherList.valueCount > 0) {
+            setSize((valueCount + otherList.valueCount) * PositionWritable.LENGTH);
+            // copy contents of otherList into the end of my storage
+            System.arraycopy(otherList.storage, otherList.offset,
+                    storage, offset + valueCount * PositionWritable.LENGTH, 
+                    otherList.valueCount * PositionWritable.LENGTH);
+            valueCount += otherList.valueCount;
+        }
+    }
+    
     public static int getCountByDataLength(int length) {
         if (length % PositionWritable.LENGTH != 0) {
             for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 5a59a87..25353f0 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -15,6 +15,10 @@
 
 package edu.uci.ics.genomix.data.test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
 import junit.framework.Assert;
 
 import org.junit.Test;
@@ -115,7 +119,7 @@
     }
 
     @Test
-    public void TestMergeNextKmer() {
+    public void TestMergeFFKmer() {
         byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
         String text = "AGCTGACCGT";
         KmerBytesWritable kmer1 = new KmerBytesWritable(8);
@@ -127,12 +131,12 @@
         Assert.assertEquals(text2, kmer2.toString());
         KmerBytesWritable merge = new KmerBytesWritable(kmer1);
         int kmerSize = 8;
-        merge.mergeNextKmer(kmerSize, kmer2);
+        merge.mergeWithFFKmer(kmerSize, kmer2);
         Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
 
         for (int i = 1; i < 8; i++) {
             merge.set(kmer1);
-            merge.mergeNextKmer(i, kmer2);
+            merge.mergeWithFFKmer(i, kmer2);
             Assert.assertEquals(text1 + text2.substring(i - 1), merge.toString());
         }
 
@@ -148,15 +152,92 @@
                 Assert.assertEquals(text2, kmer2.toString());
                 for (int x = 1; x < jk; x++) {
                     merge.set(kmer1);
-                    merge.mergeNextKmer(x, kmer2);
+                    merge.mergeWithFFKmer(x, kmer2);
                     Assert.assertEquals(text1 + text2.substring(x - 1), merge.toString());
                 }
             }
         }
     }
+    
+    @Test
+    public void TestMergeFRKmer() {
+        int kmerSize = 3;
+        String result = "AAGCTAACAACC";
+        byte[] resultArray = result.getBytes();
+        
+        String text1 = "AAGCTAA";
+        KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+        kmer1.setByRead(resultArray, 0);
+        Assert.assertEquals(text1, kmer1.toString());
+        
+        // kmer2 is the rc of the end of the read
+        String text2 = "GGTTGTT";
+        KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+        kmer2.setByReadReverse(resultArray, result.length() - text2.length());
+        Assert.assertEquals(text2, kmer2.toString());
+        
+        KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+        merge.mergeWithFRKmer(kmerSize, kmer2);
+        Assert.assertEquals(result, merge.toString());
+        
+        int i = 1;
+        merge.set(kmer1);
+        merge.mergeWithFRKmer(i, kmer2);
+        Assert.assertEquals("AAGCTAAAACAACC", merge.toString());
+        
+        i = 2;
+        merge.set(kmer1);
+        merge.mergeWithFRKmer(i, kmer2);
+        Assert.assertEquals("AAGCTAAACAACC", merge.toString());
+        
+        i = 3;
+        merge.set(kmer1);
+        merge.mergeWithFRKmer(i, kmer2);
+        Assert.assertEquals("AAGCTAACAACC", merge.toString());
+    }
+    
+    
+    @Test
+    public void TestMergeRFKmer() {
+        int kmerSize = 3;
+        String result = "GGCACAACAACCC";
+        byte[] resultArray = result.getBytes();
+        
+        String text1 = "AACAACCC";
+        KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+        kmer1.setByRead(resultArray, 5);
+        Assert.assertEquals(text1, kmer1.toString());
+        
+        // kmer2 is the rc of the end of the read
+        String text2 = "TTGTGCC";
+        KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+        kmer2.setByReadReverse(resultArray, 0);
+        Assert.assertEquals(text2, kmer2.toString());
+        
+        KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+        merge.mergeWithRFKmer(kmerSize, kmer2);
+        Assert.assertEquals(result, merge.toString());
+        
+        int i = 1;
+        merge.set(kmer1);
+        merge.mergeWithRFKmer(i, kmer2);
+        Assert.assertEquals("GGCACAAAACAACCC", merge.toString());
+        
+        i = 2;
+        merge.set(kmer1);
+        merge.mergeWithRFKmer(i, kmer2);
+        Assert.assertEquals("GGCACAAACAACCC", merge.toString());
+        
+        i = 3;
+        merge.set(kmer1);
+        merge.mergeWithRFKmer(i, kmer2);
+        Assert.assertEquals("GGCACAACAACCC", merge.toString());
+    }
+    
+    
 
     @Test
-    public void TestMergePreKmer() {
+    public void TestMergeRRKmer() {
         byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
         String text = "AGCTGACCGT";
         KmerBytesWritable kmer1 = new KmerBytesWritable(8);
@@ -168,12 +249,12 @@
         Assert.assertEquals(text2, kmer2.toString());
         KmerBytesWritable merge = new KmerBytesWritable(kmer2);
         int kmerSize = 8;
-        merge.mergePreKmer(kmerSize, kmer1);
+        merge.mergeWithRRKmer(kmerSize, kmer1);
         Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
 
         for (int i = 1; i < 8; i++) {
             merge.set(kmer2);
-            merge.mergePreKmer(i, kmer1);
+            merge.mergeWithRRKmer(i, kmer1);
             Assert.assertEquals(text1.substring(0, text1.length() - i + 1) + text2, merge.toString());
         }
 
@@ -189,7 +270,7 @@
                 Assert.assertEquals(text2, kmer2.toString());
                 for (int x = 1; x < ik; x++) {
                     merge.set(kmer2);
-                    merge.mergePreKmer(x, kmer1);
+                    merge.mergeWithRRKmer(x, kmer1);
                     Assert.assertEquals(text1.substring(0, text1.length() - x + 1) + text2, merge.toString());
                 }
             }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index c2b0e52..a7ca739 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -23,7 +23,8 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeFlag;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
@@ -32,28 +33,10 @@
     /*
      * Flags used when sending messages
      */
-    public static class MessageFlag {
-        public static final byte EMPTY_MESSAGE = 0;
-        public static final byte FROM_SELF = 1;
-        public static final byte FROM_SUCCESSOR = 1 << 1;
-        public static final byte FROM_PREDECESSOR = 1 << 2;
-        public static final byte IS_HEAD = 1 << 3;
-        public static final byte IS_TAIL = 1 << 4;
-        public static final byte IS_PSEUDOHEAD = 1 << 5;
-        public static final byte IS_COMPLETE = 1 << 6;
-
-        public static String getFlagAsString(byte code) {
-            // TODO: allow multiple flags to be set
-            switch (code) {
-                case EMPTY_MESSAGE:
-                    return "EMPTY_MESSAGE";
-                case FROM_SELF:
-                    return "FROM_SELF";
-                case FROM_SUCCESSOR:
-                    return "FROM_SUCCESSOR";
-            }
-            return "ERROR_BAD_MESSAGE";
-        }
+    public static class MergeMessageFlag extends PathNodeFlag {
+        public static final byte FROM_SUCCESSOR = 1 << 5;
+        public static final byte FROM_PREDECESSOR = 1 << 6;
+        public static final byte IS_PSEUDOHEAD = ((byte) 1 << 6); //TODO FIXME        
     }
 
     /*
@@ -61,14 +44,14 @@
      * Heads send themselves to their successors, and all others map themselves.
      */
     private static class MergePathsH3Mapper extends MapReduceBase implements
-            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private static long randSeed;
         private Random randGenerator;
         private float probBeingRandomHead;
 
         private int KMER_SIZE;
         private PositionWritable outputKey;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private byte headFlag;
         private byte outFlag;
@@ -81,7 +64,7 @@
             finalMerge = conf.getBoolean("finalMerge", false);
 
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputKey = new PositionWritable();
             curNode = new NodeWritable(KMER_SIZE);
         }
@@ -93,34 +76,34 @@
         }
 
         @Override
-        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
             curNode = value.getNode();
             // Map all path vertices; Heads and pseudoheads are sent to their successors
             // NOTE: all mapping nodes are already simple paths
 
             // Node may be marked as head b/c it's a real head, it's a previously merged head, or the node appears as a random head
-            headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+            headFlag = (byte) (MergeMessageFlag.IS_HEAD & value.getFlag());
             // remove all pseudoheads on the last iteration
             if (!finalMerge) {
-                headFlag |= (MessageFlag.IS_PSEUDOHEAD & value.getFlag());
+                headFlag |= (MergeMessageFlag.IS_PSEUDOHEAD & value.getFlag());
             }
 
-            outFlag = (byte) (headFlag | (MessageFlag.IS_TAIL & value.getFlag()));
+            outFlag = (byte) (headFlag | (MergeMessageFlag.IS_TAIL & value.getFlag()));
             if (headFlag != 0 || isNodeRandomHead(curNode.getNodeID())) {
                 // head nodes send themselves to their successor
                 //outputKey.set(curNode.getOutgoingList().getPosition(0));
                 if (!finalMerge) {
-                    headFlag |= (MessageFlag.IS_PSEUDOHEAD & value.getFlag());
+                    headFlag |= (MergeMessageFlag.IS_PSEUDOHEAD & value.getFlag());
                 }
-                outFlag |= MessageFlag.FROM_PREDECESSOR;
+                outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
 
                 outputValue.set(outFlag, curNode);
                 output.collect(outputKey, outputValue);
             } else {
                 // tail nodes map themselves
-                outFlag |= MessageFlag.FROM_SELF;
+                outFlag |= MergeMessageFlag.MSG_SELF;
                 outputValue.set(outFlag, curNode);
                 output.collect(key, outputValue);
             }
@@ -131,11 +114,11 @@
      * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes 
      */
     private static class MergePathsH3Reducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable headNode;
         private NodeWritable tailNode;
         private int count;
@@ -143,20 +126,20 @@
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             headNode = new NodeWritable(KMER_SIZE);
             tailNode = new NodeWritable(KMER_SIZE);
         }
 
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
 
             inputValue = values.next();
             if (!values.hasNext()) {
                 // all single nodes must be remapped
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+                if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) == MergeMessageFlag.MSG_SELF) {
                     // FROM_SELF => remap self
                     output.collect(key, inputValue);
                 } else {
@@ -166,11 +149,11 @@
             } else {
                 // multiple inputs => a merge will take place. Aggregate both, then collect the merged path
                 count = 0;
-                outFlag = MessageFlag.EMPTY_MESSAGE;
+                outFlag = MergeMessageFlag.EMPTY_MESSAGE;
                 while (true) { // process values; break when no more
                     count++;
-                    outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_PSEUDOHEAD | MessageFlag.IS_TAIL));
-                    if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+                    outFlag |= (inputValue.getFlag() & (MergeMessageFlag.IS_HEAD | MergeMessageFlag.IS_PSEUDOHEAD | MergeMessageFlag.IS_TAIL));
+                    if ((inputValue.getFlag() & MergeMessageFlag.FROM_PREDECESSOR) == MergeMessageFlag.FROM_PREDECESSOR) {
                         headNode.set(inputValue.getNode());
                     } else {
                         tailNode.set(inputValue.getNode());
@@ -188,12 +171,12 @@
                 //headNode.mergeNext(tailNode, KMER_SIZE);
                 outputValue.set(outFlag, headNode);
 
-                if ((outFlag & MessageFlag.IS_TAIL) == MessageFlag.IS_TAIL) {
+                if ((outFlag & MergeMessageFlag.IS_TAIL) == MergeMessageFlag.IS_TAIL) {
                     // Pseudoheads merging with tails don't become heads.
                     // Reset the IS_PSEUDOHEAD flag
-                    outFlag &= ~MessageFlag.IS_PSEUDOHEAD;
+                    outFlag &= ~MergeMessageFlag.IS_PSEUDOHEAD;
 
-                    if ((outFlag & MessageFlag.IS_HEAD) == MessageFlag.IS_HEAD) {
+                    if ((outFlag & MergeMessageFlag.IS_HEAD) == MergeMessageFlag.IS_HEAD) {
                         // True heads meeting tails => merge is complete for this node
                         // TODO: send to the "complete" collector
                     }
@@ -219,9 +202,9 @@
         conf.setOutputFormat(SequenceFileOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
         conf.setMapperClass(MergePathsH3Mapper.class);
         conf.setReducerClass(MergePathsH3Reducer.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index b240833..1fef016 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -1,6 +1,23 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -21,30 +38,55 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
 
+/*
+ * a probabilistic merge algorithm for merging long single paths (chains without only 1 incoming and outgoing edge)
+ * The merge is guaranteed to succeed, but not all nodes that could be merged in an iteration will be.
+ * 
+ * There are two steps to the merge: 
+ *    1. (H4UpdatesMapper & H4UpdatesReducer): the direction of the merge is chosen and all 
+ *       neighbor's edges are updated with the merge intent 
+ *    2. H4MergeMapper & H4MergeReducer): the nodes initiating the merge are "sent" to their neighbors, kmers are combined, and edges 
+ *       are again updated (since the merge-initiator may be neighbor to another merging node).  
+ */
 @SuppressWarnings("deprecation")
 public class MergePathsH4 extends Configured implements Tool {
 
+    private enum MergeDir {
+        NO_MERGE,
+        FORWARD,
+        BACKWARD
+
+    }
+
     /*
-     * Mapper class: Partition the graph using random pseudoheads.
-     * Heads send themselves to their successors, and all others map themselves.
+     * Mapper class: randomly chooses a direction to merge s.t. if a merge takes place, it will be successful.
+     *      Sends update messages to all of this node's neighbors who their new neighbor will be
      */
-    public static class MergePathsH4Mapper extends MapReduceBase implements
-            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+    public static class H4UpdatesMapper extends MapReduceBase implements
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private static long randSeed;
         private Random randGenerator;
         private float probBeingRandomHead;
 
         private int KMER_SIZE;
-        private PositionWritable outputKey;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
+        private NodeWithFlagWritable mergeMsgValue;
+        private NodeWithFlagWritable updateMsgValue;
+
         private NodeWritable curNode;
         private PositionWritable curID;
         private PositionWritable nextID;
@@ -54,20 +96,26 @@
         private boolean curHead;
         private boolean nextHead;
         private boolean prevHead;
-        private boolean willMerge;
+        private MergeDir mergeDir;
+        private byte inFlag;
         private byte headFlag;
         private byte tailFlag;
-        private byte outFlag;
+        private byte mergeMsgFlag;
+        private byte nextDir;
+        private byte prevDir;
 
         public void configure(JobConf conf) {
-            
+
             randSeed = conf.getLong("randomSeed", 0);
             randGenerator = new Random(randSeed);
             probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
 
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            outputKey = new PositionWritable();
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
+
+            mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+            updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
             curNode = new NodeWritable(KMER_SIZE);
             curID = new PositionWritable();
             nextID = new PositionWritable();
@@ -85,11 +133,13 @@
          */
         protected boolean setNextInfo(NodeWritable node) {
             if (node.getFFList().getCountOfPosition() > 0) {
+                nextDir = MessageFlag.DIR_FF;
                 nextID.set(node.getFFList().getPosition(0));
                 nextHead = isNodeRandomHead(nextID);
                 return true;
             }
             if (node.getFRList().getCountOfPosition() > 0) {
+                nextDir = MessageFlag.DIR_FR;
                 nextID.set(node.getFRList().getPosition(0));
                 nextHead = isNodeRandomHead(nextID);
                 return true;
@@ -102,11 +152,13 @@
          */
         protected boolean setPrevInfo(NodeWritable node) {
             if (node.getRRList().getCountOfPosition() > 0) {
+                prevDir = MessageFlag.DIR_RR;
                 prevID.set(node.getRRList().getPosition(0));
                 prevHead = isNodeRandomHead(prevID);
                 return true;
             }
             if (node.getRFList().getCountOfPosition() > 0) {
+                prevDir = MessageFlag.DIR_RF;
                 prevID.set(node.getRFList().getPosition(0));
                 prevHead = isNodeRandomHead(prevID);
                 return true;
@@ -115,51 +167,34 @@
         }
 
         @Override
-        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
-                throws IOException {
-            // Node may be marked as head b/c it's a real head or a real tail
-            headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
-            tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
-            outFlag = (byte) (headFlag | tailFlag);
-            
-            // only PATH vertices are present. Find the ID's for my neighbors
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            inFlag = value.getFlag();
             curNode.set(value.getNode());
             curID.set(curNode.getNodeID());
-            
+
+            headFlag = (byte) (MessageFlag.IS_HEAD & inFlag);
+            tailFlag = (byte) (MessageFlag.IS_TAIL & inFlag);
+            mergeMsgFlag = (byte) (headFlag | tailFlag);
+
             curHead = isNodeRandomHead(curID);
             // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
             // We prevent merging towards non-path nodes
             hasNext = setNextInfo(curNode) && tailFlag == 0;
             hasPrev = setPrevInfo(curNode) && headFlag == 0;
-            willMerge = false;
-            
-            reporter.setStatus("CHECK ME OUT");
-            System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
+            mergeDir = MergeDir.NO_MERGE; // no merge to happen
 
-            // TODO: need to update edges in neighboring nodes
-            
-            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
-                // true HEAD met true TAIL. this path is complete
-                outFlag |= MessageFlag.FROM_SELF;
-                outputValue.set(outFlag, curNode);
-                output.collect(curID, outputValue);
-                return;
-            }
+            // decide where we're going to merge to
             if (hasNext || hasPrev) {
                 if (curHead) {
                     if (hasNext && !nextHead) {
-                        // compress this head to the forward tail
-                        outFlag |= MessageFlag.FROM_PREDECESSOR;
-                        outputValue.set(outFlag, curNode);
-                        output.collect(nextID, outputValue);
-                        willMerge = true;
+                        // merge forward
+                        mergeMsgFlag |= nextDir;
+                        mergeDir = MergeDir.FORWARD;
                     } else if (hasPrev && !prevHead) {
-                        // compress this head to the reverse tail
-                        outFlag |= MessageFlag.FROM_SUCCESSOR;
-                        outputValue.set(outFlag, curNode);
-                        output.collect(prevID, outputValue);
-                        willMerge = true;
+                        // merge backwards
+                        mergeMsgFlag |= prevDir;
+                        mergeDir = MergeDir.BACKWARD;
                     }
                 } else {
                     // I'm a tail
@@ -167,58 +202,242 @@
                         if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
-                            outFlag |= MessageFlag.FROM_PREDECESSOR;
-                            outputValue.set(outFlag, curNode);
-                            output.collect(nextID, outputValue);
-                            willMerge = true;
+                            mergeMsgFlag |= nextDir;
+                            mergeDir = MergeDir.FORWARD;
                         }
                     } else if (!hasPrev) {
                         // no previous node
                         if (!nextHead && curID.compareTo(nextID) < 0) {
                             // merge towards tail in forward dir
-                            outFlag |= MessageFlag.FROM_PREDECESSOR;
-                            outputValue.set(outFlag, curNode);
-                            output.collect(nextID, outputValue);
-                            willMerge = true;
+                            mergeMsgFlag |= nextDir;
+                            mergeDir = MergeDir.FORWARD;
                         }
                     } else if (!hasNext) {
                         // no next node
                         if (!prevHead && curID.compareTo(prevID) < 0) {
                             // merge towards tail in reverse dir
-                            outFlag |= MessageFlag.FROM_SUCCESSOR;
-                            outputValue.set(outFlag, curNode);
-                            output.collect(prevID, outputValue);
-                            willMerge = true;
+                            mergeMsgFlag |= prevDir;
+                            mergeDir = MergeDir.BACKWARD;
                         }
                     }
                 }
             }
 
-            // if we didn't send ourselves to some other node, remap ourselves for the next round
-            if (!willMerge) {
-                outFlag |= MessageFlag.FROM_SELF;
-                outputValue.set(outFlag, curNode);
-                output.collect(curID, outputValue);
+            if (mergeDir == MergeDir.NO_MERGE) {
+                mergeMsgFlag |= MessageFlag.MSG_SELF;
+                mergeMsgValue.set(mergeMsgFlag, curNode);
+                output.collect(curID, mergeMsgValue);
+            } else {
+                // this node will do a merge next round
+                mergeMsgFlag |= MessageFlag.MSG_UPDATE_MERGE;
+                mergeMsgValue.set(mergeMsgFlag, curNode);
+                output.collect(curID, mergeMsgValue);
+
+                sendUpdateToNeighbors(curNode, (byte) (mergeMsgFlag & MessageFlag.DIR_MASK), output);
             }
-            else {
-                // TODO send update to this node's neighbors
-                //mos.getCollector(UPDATES_OUTPUT, reporter).collect(key, outputValue);
+        }
+
+        /*
+         * when performing a merge, an update message needs to be sent to my neighbors
+         */
+        private void sendUpdateToNeighbors(NodeWritable node, byte mergeDir,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+            PositionWritable mergeSource = node.getNodeID();
+            PositionWritable mergeTarget = node.getListFromDir(mergeDir).getPosition(0);
+
+            // I need to notify in the opposite direction as I'm merging
+            Iterator<PositionWritable> posIterator1;
+            byte dir1;
+            Iterator<PositionWritable> posIterator2;
+            byte dir2;
+            switch (mergeDir) {
+                case MessageFlag.DIR_FF:
+                case MessageFlag.DIR_FR:
+                    // merging forward; tell my previous neighbors
+                    posIterator1 = node.getRRList().iterator();
+                    dir1 = MessageFlag.DIR_RR;
+                    posIterator2 = node.getRFList().iterator();
+                    dir2 = MessageFlag.DIR_RF;
+                    break;
+                case MessageFlag.DIR_RF:
+                case MessageFlag.DIR_RR:
+                    posIterator1 = node.getFFList().iterator();
+                    dir1 = MessageFlag.DIR_FF;
+                    posIterator2 = node.getFRList().iterator();
+                    dir2 = MessageFlag.DIR_FR;
+                    break;
+                default:
+                    throw new IOException("Unrecognized direction in sendUpdateToNeighbors: " + mergeDir);
+            }
+            while (posIterator1.hasNext()) {
+                updateMsgValue.setAsUpdateMessage(mergeDir, dir1, mergeSource, mergeTarget);
+                collector.collect(posIterator1.next(), updateMsgValue);
+            }
+            while (posIterator2.hasNext()) {
+                updateMsgValue.setAsUpdateMessage(mergeDir, dir2, mergeSource, mergeTarget);
+                collector.collect(posIterator2.next(), outputValue);
             }
         }
     }
 
     /*
-     * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes 
+     * Reducer class: processes the update messages from updateMapper
      */
-    private static class MergePathsH4Reducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-        private MultipleOutputs mos;
-        public static final String COMPLETE_OUTPUT = "complete";
-        public static final String UPDATES_OUTPUT = "update";
-        
+    private static class H4UpdatesReducer extends MapReduceBase implements
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
+        private NodeWritable curNode;
+        private PositionWritable outPosn;
+        private ArrayList<NodeWithFlagWritable> updateMsgs;
+        private boolean sawCurNode;
+        private byte outFlag;
+        private byte inFlag;
+
+        public void configure(JobConf conf) {
+            KMER_SIZE = conf.getInt("sizeKmer", 0);
+            inputValue = new NodeWithFlagWritable(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
+            curNode = new NodeWritable(KMER_SIZE);
+            outPosn = new PositionWritable();
+            updateMsgs = new ArrayList<NodeWithFlagWritable>();
+        }
+
+        /*
+         * Process updates from mapper
+         * 
+         * (non-Javadoc)
+         * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+         */
+        @SuppressWarnings("unchecked")
+        @Override
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            sawCurNode = false;
+            updateMsgs.clear();
+            
+            byte inDir;
+            while (values.hasNext()) {
+                inputValue.set(values.next());
+                inFlag = inputValue.getFlag();
+                inDir = (byte) (inFlag & MessageFlag.MSG_MASK);
+                
+                switch (inDir) {
+                    case MessageFlag.MSG_UPDATE_MERGE:
+                    case MessageFlag.MSG_SELF:
+                        if (sawCurNode)
+                            throw new IOException("Saw more than one MSG_SELF! previously seen self: " + curNode
+                                    + "  current self: " + inputValue.getNode());
+                        curNode.set(inputValue.getNode());
+                        outFlag = inFlag;
+                        sawCurNode = true;
+                        if (inDir == MessageFlag.MSG_SELF) {
+                            outPosn.set(curNode.getNodeID());
+                        } else {  // MSG_UPDATE_MERGE
+                            // merge messages are sent to their merge recipient
+                            outPosn.set(curNode.getListFromDir(inDir).getPosition(0));
+                        }
+                        break;
+                    case MessageFlag.MSG_UPDATE_EDGE:
+                        updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+                        break;
+                    default:
+                        throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
+                }
+            }
+
+            // process all the update messages for this node
+            // I have no idea how to make this more efficient...
+            for (NodeWithFlagWritable updateMsg : updateMsgs) {
+                NodeWithFlagWritable.processUpdates(curNode, updateMsg, KMER_SIZE);
+            }
+            outputValue.set(outFlag, curNode);
+            output.collect(outPosn, outputValue);
+        }
+    }
+    
+    
+    /*
+     * Mapper class: sends the update messages to their (already decided) destination
+     */
+    public static class H4MergeMapper extends MapReduceBase implements
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+        private static long randSeed;
+        private Random randGenerator;
+        private float probBeingRandomHead;
+
+        private int KMER_SIZE;
+        private NodeWithFlagWritable outputValue;
+        private NodeWithFlagWritable mergeMsgValue;
+        private NodeWithFlagWritable updateMsgValue;
+
+        private NodeWritable curNode;
+        private PositionWritable curID;
+        private PositionWritable nextID;
+        private PositionWritable prevID;
+        private boolean hasNext;
+        private boolean hasPrev;
+        private boolean curHead;
+        private boolean nextHead;
+        private boolean prevHead;
+        private MergeDir mergeDir;
+        private byte inFlag;
+        private byte headFlag;
+        private byte tailFlag;
+        private byte mergeMsgFlag;
+        private byte nextDir;
+        private byte prevDir;
+
+        public void configure(JobConf conf) {
+
+            randSeed = conf.getLong("randomSeed", 0);
+            randGenerator = new Random(randSeed);
+            probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+            KMER_SIZE = conf.getInt("sizeKmer", 0);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
+
+            mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+            updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
+            curNode = new NodeWritable(KMER_SIZE);
+            curID = new PositionWritable();
+            nextID = new PositionWritable();
+            prevID = new PositionWritable();
+        }
+
+        @Override
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            inFlag = value.getFlag();
+            curNode.set(value.getNode());
+            curID.set(curNode.getNodeID());
+            
+        }
+
+    }
+
+    
+    
+    
+
+    /*
+     * Reducer class: processes the update messages from updateMapper
+     */
+    private static class H4MergeReducer2 extends MapReduceBase implements
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+        private MultipleOutputs mos;
+        private static final String TO_MERGE_OUTPUT = "toMerge";
+        private static final String COMPLETE_OUTPUT = "complete";
+        private static final String UPDATES_OUTPUT = "update";
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
+
+        private int KMER_SIZE;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private NodeWritable prevNode;
         private NodeWritable nextNode;
@@ -231,8 +450,8 @@
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            inputValue = new NodeWithFlagWritable(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
             prevNode = new NodeWritable(KMER_SIZE);
             nextNode = new NodeWritable(KMER_SIZE);
@@ -240,19 +459,22 @@
 
         @SuppressWarnings("unchecked")
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
-                throws IOException {
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+            completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+            updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
-                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+                if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
+                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0
+                            && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
                         // complete path (H & T meet in this node)
-                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
-                    } else { 
+                        completeCollector.collect(key, inputValue);
+                    } else {
                         // FROM_SELF => no merging this round. remap self
-                        output.collect(key, inputValue);
+                        toMergeCollector.collect(key, inputValue);
                     }
                 } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
                     // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton?  error here!
@@ -274,7 +496,7 @@
                     } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
                         nextNode.set(inputValue.getNode());
                         sawNextNode = true;
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+                    } else if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
                         curNode.set(inputValue.getNode());
                         sawCurNode = true;
                     } else {
@@ -304,54 +526,87 @@
                     curNode.mergeForwardPre(prevNode, KMER_SIZE);
                     reporter.incrCounter("genomix", "num_merged", 1);
                 }
-                
+
                 outputValue.set(outFlag, curNode);
                 if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
                     // True heads meeting tails => merge is complete for this node
-                    mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
+                    completeCollector.collect(key, outputValue);
                 } else {
-                    output.collect(key, outputValue);
+                    toMergeCollector.collect(key, outputValue);
                 }
             }
         }
+
+        public void close() throws IOException {
+            mos.close();
+        }
     }
 
     /*
      * Run one iteration of the mergePaths algorithm
      */
-    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput,
+            JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(MergePathsH4.class);
         conf.setJobName("MergePathsH4 " + inputPath);
 
-        FileInputFormat.addInputPath(conf, new Path(inputPath));
-        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+        FileInputFormat.addInputPaths(conf, inputPath);
+        Path outputPath = new Path(inputPath + ".h4merge.tmp");
+        FileOutputFormat.setOutputPath(conf, outputPath);
 
         conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        conf.setOutputFormat(NullOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
-        conf.setMapperClass(MergePathsH4Mapper.class);
-        conf.setReducerClass(MergePathsH4Reducer.class);
+        conf.setMapperClass(H4UpdatesMapper.class);
+        conf.setReducerClass(H4UpdatesReducer.class);
 
-        FileSystem.get(conf).delete(new Path(outputPath), true);
+        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, NodeWithFlagWritable.class);
+        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, NodeWithFlagWritable.class);
+        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, NodeWithFlagWritable.class);
 
-        return JobClient.runJob(conf);
+        FileSystem dfs = FileSystem.get(conf);
+        // clean output dirs
+        dfs.delete(outputPath, true);
+        dfs.delete(new Path(toMergeOutput), true);
+        dfs.delete(new Path(completeOutput), true);
+        dfs.delete(new Path(updatesOutput), true);
+
+        RunningJob job = JobClient.runJob(conf);
+
+        // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
+        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.TO_MERGE_OUTPUT), new Path(
+                toMergeOutput))) {
+            dfs.mkdirs(new Path(toMergeOutput));
+        }
+        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.COMPLETE_OUTPUT), new Path(
+                completeOutput))) {
+            dfs.mkdirs(new Path(completeOutput));
+        }
+        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.UPDATES_OUTPUT),
+                new Path(updatesOutput))) {
+            dfs.mkdirs(new Path(updatesOutput));
+        }
+
+        return job;
     }
 
     @Override
-    public int run(String[] arg0) throws Exception {
-        // TODO Auto-generated method stub
-        return 0;
+    public int run(String[] args) throws Exception {
+        int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
+        return res;
     }
 
     public static void main(String[] args) throws Exception {
         int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
-        System.out.println("Ran the job fine!");
         System.exit(res);
     }
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index 155b999..72be4b5 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -16,16 +16,34 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.tools.ant.util.IdentityMapper;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+
 @SuppressWarnings("deprecation")
 public class MergePathsH4Driver {
 
+    private static final String TO_MERGE = "toMerge";
+    private static final String COMPLETE = "complete";
+    private static final String UPDATES = "updates";
+    private String mergeOutput;
+    private String completeOutput;
+    private String updatesOutput;
+
+    private void setOutputPaths(String basePath, int mergeIteration) {
+        basePath = basePath.replaceAll("/$", ""); // strip trailing slash
+        mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
+        completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
+        updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
+    }
+
     private static class Options {
         @Option(name = "-inputpath", usage = "the input path", required = true)
         public String inputPath;
@@ -44,13 +62,19 @@
 
         @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
         public int mergeRound;
-        
+
         @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
         public String hadoopConf;
 
     }
 
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+    /*
+     * Main driver for path merging. Given a graph, this driver runs
+     * PathNodeInitial to ID heads and tails, then does up to @mergeRound
+     * iterations of path merging. Updates during the merge are batch-processed
+     * at the end in a final update job.
+     */
+    public void run(String inputGraphPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
             String defaultConfPath, JobConf defaultConf) throws IOException {
         JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
         if (defaultConfPath != null) {
@@ -58,40 +82,71 @@
         }
         baseConf.setNumReduceTasks(numReducers);
         baseConf.setInt("sizeKmer", sizeKmer);
-
         FileSystem dfs = FileSystem.get(baseConf);
-        String prevOutput = inputPath;
-        dfs.delete(new Path(outputPath), true); // clear any previous output
 
-        String tmpOutputPath = "NO_JOBS_DONE";
-        boolean finalMerge = false;
-        for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
-            baseConf.setInt("iMerge", iMerge);
-            baseConf.setBoolean("finalMerge", finalMerge);
-            MergePathsH4 merger = new MergePathsH4();
-            tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
-            RunningJob job = merger.run(prevOutput, tmpOutputPath, baseConf);
-            if (job.getCounters().findCounter("genomix", "num_merged").getValue() == 0) {
-                if (!finalMerge) {
-                    // all of the pseudoheads have found each other.  H3 now behaves like H1
-                    finalMerge = true;
-                } else {
-                    // already in final merge stage and all paths were merged before.  We're done!
-                    break;
-                }
+        int iMerge = 0;
+
+        // identify head and tail nodes with pathnode initial
+        PathNodeInitial inith4 = new PathNodeInitial();
+        setOutputPaths(inputGraphPath, iMerge);
+        String prevToMergeOutput = inputGraphPath;
+        System.out.println("initial run.  toMerge: " + mergeOutput + ", complete: " + completeOutput);
+        inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
+        dfs.copyToLocalFile(new Path(mergeOutput), new Path("initial-toMerge"));
+        dfs.copyToLocalFile(new Path(completeOutput), new Path("initial-complete"));
+
+        // several iterations of merging
+        MergePathsH4 merger = new MergePathsH4();
+        for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
+            prevToMergeOutput = mergeOutput;
+            setOutputPaths(inputGraphPath, iMerge);
+            merger.run(prevToMergeOutput, mergeOutput, completeOutput, updatesOutput, baseConf);
+            dfs.copyToLocalFile(new Path(mergeOutput), new Path("i" + iMerge +"-toMerge"));
+            dfs.copyToLocalFile(new Path(completeOutput), new Path("i" + iMerge +"-complete"));
+            dfs.copyToLocalFile(new Path(updatesOutput), new Path("i" + iMerge +"-updates"));
+            
+            if (dfs.listStatus(new Path(mergeOutput)) == null || dfs.listStatus(new Path(mergeOutput)).length == 0) {
+                // no output from previous run-- we are done!
+                break;
             }
         }
-        dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
+        
+        // finally, combine all the completed paths and update messages to
+        // create a single merged graph output
+        dfs.delete(new Path(outputGraphPath), true); // clear any previous
+                                                     // output
+        // use all the "complete" and "update" outputs in addition to the final
+        // (possibly empty) toMerge directories
+        // as input to the final update step. This builds a comma-delim'ed
+        // String of said files.
+        final String lastMergeOutput = mergeOutput;
+        PathFilter updateFilter = new PathFilter() {
+            @Override
+            public boolean accept(Path arg0) {
+                String path = arg0.toString();
+                System.out.println("equals last: " + path + " vs " + lastMergeOutput + " = " + path.endsWith(lastMergeOutput));
+                return (path.matches(".*" + COMPLETE + "_i\\d+$") || path.matches(".*" + UPDATES + "_i\\d+$") || path.endsWith(lastMergeOutput));
+            }
+        };
+        StringBuilder sb = new StringBuilder();
+        String delim = "";
+        for (FileStatus file : dfs.globStatus(new Path(inputGraphPath.replaceAll("/$",  "") + "*"), updateFilter)) {
+            sb.append(delim).append(file.getPath());
+            delim = ",";
+        }
+        String finalInputs = sb.toString();
+        System.out.println("This is the final sacrifice: " + finalInputs);
+        // TODO run the update iteration
     }
 
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+    public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
             String defaultConfPath) throws IOException {
-        run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
+        run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
     }
 
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+    public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
             JobConf defaultConf) throws IOException {
-        run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
+        run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
index 6518532..198c769 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -23,10 +23,10 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.H4UpdatesMapper;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
@@ -37,22 +37,22 @@
      * Mapper class: removes any tips by not mapping them at all
      */
     private static class RemoveTipsMapper extends MapReduceBase implements
-            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private int KMER_SIZE;
         private int removeTipsMinLength;
 
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
 
         public void configure(JobConf conf) {
             removeTipsMinLength = conf.getInt("removeTipsMinLength", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
         }
 
         @Override
-        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
             curNode.set(value.getNode());
             if ((curNode.inDegree() == 0 || curNode.outDegree() == 0)
@@ -60,7 +60,7 @@
                 // kill this node by NOT mapping it.  Update my neighbors with a suicide note
                 //TODO: update neighbors by removing me from its list
             } else {
-                outputValue.set(MessageFlag.FROM_SELF, curNode);
+                outputValue.set(MergeMessageFlag.MSG_SELF, curNode);
                 output.collect(key, value);
             }
         }
@@ -70,11 +70,11 @@
      * Reducer class: keeps mapped nodes 
      */
     private static class MergePathsH4Reducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private NodeWritable prevNode;
         private NodeWritable nextNode;
@@ -86,20 +86,20 @@
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
             prevNode = new NodeWritable(KMER_SIZE);
             nextNode = new NodeWritable(KMER_SIZE);
         }
 
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
                 throws IOException {
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+                if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) > 0) {
                     // FROM_SELF => keep self
                     output.collect(key, inputValue);
                 } else {
@@ -126,11 +126,11 @@
         conf.setOutputFormat(SequenceFileOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
-        conf.setMapperClass(MergePathsH4Mapper.class);
+        conf.setMapperClass(H4UpdatesMapper.class);
         conf.setReducerClass(MergePathsH4Reducer.class);
 
         FileSystem.get(conf).delete(new Path(outputPath), true);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
deleted file mode 100644
index f05797e..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package edu.uci.ics.genomix.hadoop.pmcommon;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-/*
- * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
- * This class is used as the value in several MapReduce algorithms.
- */
-public class MessageWritableNodeWithFlag extends BinaryComparable implements WritableComparable<BinaryComparable> {
-    private byte flag;
-    private NodeWritable node;
-
-    public MessageWritableNodeWithFlag() {
-        this(0);
-    }
-
-    public MessageWritableNodeWithFlag(int k) {
-        this.flag = 0;
-        this.node = new NodeWritable(k);
-    }
-
-    public MessageWritableNodeWithFlag(byte flag, int kmerSize) {
-        this.flag = flag;
-        this.node = new NodeWritable(kmerSize);
-    }
-    
-    public MessageWritableNodeWithFlag(byte flag, NodeWritable node) {
-        this(node.getKmer().getKmerLength());
-        set(flag, node);
-    }
-
-    public void set(MessageWritableNodeWithFlag right) {
-        set(right.getFlag(), right.getNode());
-    }
-
-    public void set(byte flag, NodeWritable node) {
-        this.node.set(node);
-        this.flag = flag;
-    }
-
-    @Override
-    public void readFields(DataInput arg0) throws IOException {
-        node.readFields(arg0);
-        flag = arg0.readByte();
-    }
-
-    @Override
-    public void write(DataOutput arg0) throws IOException {
-        node.write(arg0);
-        arg0.writeByte(flag);
-    }
-
-    public NodeWritable getNode() {
-        if (node.getCount() != 0) {
-            return node;
-        }
-        return null;
-    }
-
-    public byte getFlag() {
-        return this.flag;
-    }
-
-    public String toString() {
-        return node.toString() + '\t' + String.valueOf(flag);
-    }
-
-    @Override
-    public byte[] getBytes() {
-        if (node.getCount() != 0) {
-            return node.getKmer().getBytes();
-        } else
-            return null;
-    }
-
-    @Override
-    public int getLength() {
-        return node.getCount();
-    }
-
-    @Override
-    public int hashCode() {
-//        return super.hashCode() + flag + node.hashCode();
-        return flag + node.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object rightObj) {
-        if (rightObj instanceof MessageWritableNodeWithFlag) {
-            MessageWritableNodeWithFlag rightMessage = (MessageWritableNodeWithFlag) rightObj;
-            return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
-        }
-        return false;
-    }
-}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
new file mode 100644
index 0000000..a7e8157
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -0,0 +1,261 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import javax.management.RuntimeErrorException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+/*
+ * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
+ * This class is used as the value in several MapReduce algorithms.
+ */
+public class NodeWithFlagWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
+    private byte flag;
+    private NodeWritable node;
+
+    public static class MessageFlag extends DirectionFlag {
+        public static final byte EMPTY_MESSAGE = 0;
+        // message types
+        public static final byte MSG_SELF = 0b01 << 2;
+        public static final byte MSG_UPDATE_MERGE = 0b10 << 2;
+        public static final byte MSG_UPDATE_EDGE = 0b11 << 2;
+        public static final byte MSG_MASK = 0b11 << 2;
+        // additional info
+        public static final byte IS_HEAD = 0b1 << 4;
+        public static final byte IS_TAIL = 0b1 << 5;
+        // extra bit used differently in each operation
+        public static final byte EXTRA_FLAG = 1 << 6;
+    }
+
+    public void setAsUpdateMessage(byte mergeDir, byte neighborDir, PositionWritable nodeToDelete, PositionWritable nodeToAdd) {
+        byte neighborToMeDir = mirrorDirection(neighborDir);
+        byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
+
+        // clear previous kmer and edge data 
+        node.reset(0);
+
+        // indicate the node to delete
+        setFlag((byte) (MessageFlag.MSG_UPDATE_EDGE | neighborToMeDir));
+        node.getNodeID().set(nodeToDelete);
+
+        // add the new node to the appropriate list
+        node.getListFromDir(neighborToMergeDir).append(nodeToAdd);
+    }
+
+
+
+    /*
+     * Returns the edge dir for B->A when the A->B edge is type @dir
+     */
+    public byte mirrorDirection(byte dir) {
+        switch (dir) {
+            case MessageFlag.DIR_FF:
+                return MessageFlag.DIR_RR;
+            case MessageFlag.DIR_FR:
+                return MessageFlag.DIR_FR;
+            case MessageFlag.DIR_RF:
+                return MessageFlag.DIR_RF;
+            case MessageFlag.DIR_RR:
+                return MessageFlag.DIR_FF;
+            default:
+                throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+        }
+    }
+
+    /*
+     * When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge, 
+     * returns the new edge type for A->C
+     */
+    public byte flipDirection(byte neighborDir, byte mergeDir) {
+        switch (mergeDir) {
+
+            case MessageFlag.DIR_FF:
+            case MessageFlag.DIR_RR:
+                // no change since the merging node didn't flip
+                return neighborDir;
+
+            case MessageFlag.DIR_FR:
+            case MessageFlag.DIR_RF:
+                // merging node is flipping; my edge type must also flip
+                switch (neighborDir) {
+                    case MessageFlag.DIR_FF:
+                        return MessageFlag.DIR_FR;
+                    case MessageFlag.DIR_FR:
+                        return MessageFlag.DIR_FF;
+                    case MessageFlag.DIR_RF:
+                        return MessageFlag.DIR_RR;
+                    case MessageFlag.DIR_RR:
+                        return MessageFlag.DIR_RF;
+                    default:
+                        throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+                }
+
+            default:
+                throw new RuntimeException("Unrecognized direction for mergeDir: " + mergeDir);
+        }
+    }
+
+    /*
+     * Process any changes to @node contained in @updateMsg.  This includes merges and edge updates
+     */
+    public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize)
+            throws IOException {
+        byte updateFlag = updateMsg.getFlag();
+        NodeWritable updateNode = updateMsg.getNode();
+        if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
+            // this message wants to update the edges of node.
+            // remove position and merge its position lists with node
+            if (!updateNode.equals(NodeWritable.EMPTY_NODE)) {
+                // need to remove updateNode from the specified PositionList
+                switch (updateFlag & MessageFlag.DIR_MASK) {
+                    case MessageFlag.DIR_FF:
+                        node.getFFList().remove(updateNode.getNodeID());
+                        break;
+                    case MessageFlag.DIR_FR:
+                        node.getFRList().remove(updateNode.getNodeID());
+                        break;
+                    case MessageFlag.DIR_RF:
+                        node.getRFList().remove(updateNode.getNodeID());
+                        break;
+                    case MessageFlag.DIR_RR:
+                        node.getRRList().remove(updateNode.getNodeID());
+                        break;
+                    default:
+                        throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+                }
+            }
+            // now merge positionlists from update and node
+            node.getFFList().appendList(updateNode.getFFList());
+            node.getFRList().appendList(updateNode.getFRList());
+            node.getRFList().appendList(updateNode.getRFList());
+            node.getRRList().appendList(updateNode.getRRList());
+        } else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
+            // this message wants to merge node with updateNode.
+            // the direction flag indicates how the merge should take place.
+            // TODO send update or remove edge that I merge with
+            switch (updateFlag & MessageFlag.DIR_MASK) {
+                case MessageFlag.DIR_FF:
+                    node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+                    node.getFFList().set(updateNode.getFFList());
+                    // TODO not just FF list here-- FR as well
+                    break;
+                case MessageFlag.DIR_FR:
+                    // FIXME not sure if this should be reverse-complement or just reverse...
+                    node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+                    node.getFRList().set(updateNode.getFRList());
+                    break;
+                case MessageFlag.DIR_RF:
+
+                    break;
+                case MessageFlag.DIR_RR:
+                    node.getKmer().mergeWithRRKmer(kmerSize, updateNode.getKmer());
+                    node.getRRList().set(updateNode.getRRList());
+                    break;
+                default:
+                    throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+            }
+        }
+    }
+
+    public NodeWithFlagWritable() {
+        this(0);
+    }
+
+    public NodeWithFlagWritable(int k) {
+        this.flag = 0;
+        this.node = new NodeWritable(k);
+    }
+
+    public NodeWithFlagWritable(byte flag, int kmerSize) {
+        this.flag = flag;
+        this.node = new NodeWritable(kmerSize);
+    }
+
+    public NodeWithFlagWritable(byte flag, NodeWritable node) {
+        this(node.getKmer().getKmerLength());
+        set(flag, node);
+    }
+
+    public NodeWithFlagWritable(NodeWithFlagWritable other) {
+        this(other.flag, other.node);
+    }
+
+    public void set(NodeWithFlagWritable right) {
+        set(right.getFlag(), right.getNode());
+    }
+
+    public void set(byte flag, NodeWritable node) {
+        this.node.set(node);
+        this.flag = flag;
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+        node.readFields(arg0);
+        flag = arg0.readByte();
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+        node.write(arg0);
+        arg0.writeByte(flag);
+    }
+
+    public NodeWritable getNode() {
+        if (node.getCount() != 0) {
+            return node;
+        }
+        return null;
+    }
+
+    public byte getFlag() {
+        return this.flag;
+    }
+
+    public void setFlag(byte flag) {
+        this.flag = flag;
+    }
+
+    public String toString() {
+        return node.toString() + '\t' + String.valueOf(flag);
+    }
+
+    @Override
+    public byte[] getBytes() {
+        if (node.getCount() != 0) {
+            return node.getKmer().getBytes();
+        } else
+            return null;
+    }
+
+    @Override
+    public int getLength() {
+        return node.getCount();
+    }
+
+    @Override
+    public int hashCode() {
+        //        return super.hashCode() + flag + node.hashCode();
+        return flag + node.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object rightObj) {
+        if (rightObj instanceof NodeWithFlagWritable) {
+            NodeWithFlagWritable rightMessage = (NodeWithFlagWritable) rightObj;
+            return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 497e926..3c46dc7 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.genomix.hadoop.pmcommon;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -33,228 +34,270 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
 /*
  * A map-reduce job to find all nodes that are part of a simple path and the mark the nodes that
- * form their heads and tails.
+ * form their heads and tails, also identifies parts of the graph that will participate in a path merge.
+ * 
+ * This MR job uses MultipleOutputs rather than remapping the entire graph each iteration:
+ *   1. simple path nodes (indegree = outdegree = 1) (TO_MERGE_OUTPUT collector)
+ *   2. non-path, "complete" nodes, which will not be affected by the path merging (COMPLETE_OUTPUT collector)
+ *   3. non-path, "possibly updated" nodes, whose edges need to be updated after the merge (TO_UPDATE_OUTPUT collector)
  */
 @SuppressWarnings("deprecation")
 public class PathNodeInitial extends Configured implements Tool {
 
+    public static final String COMPLETE_OUTPUT = "complete";
+    public static final String TO_MERGE_OUTPUT = "toMerge";
+    public static final String TO_UPDATE_OUTPUT = "toUpdate";
+
+    private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
+
+    public static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+            OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+        Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+        posIterator = node.getFRList().iterator(); // FRList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+    }
+
+    public static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+            OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+        Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+        posIterator = node.getRFList().iterator(); // RFList
+        while (posIterator.hasNext()) {
+            collector.collect(posIterator.next(), outputValue);
+        }
+    }
+
     public static class PathNodeInitialMapper extends MapReduceBase implements
-            Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
+            Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
 
         private int KMER_SIZE;
         private PositionWritable outputKey;
-        private MessageWritableNodeWithFlag outputValue;
+        private NodeWithFlagWritable outputValue;
         private int inDegree;
         private int outDegree;
-        private NodeWritable emptyNode;
-        private Iterator<PositionWritable> posIterator;
+        private boolean pathNode;
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputKey = new PositionWritable();
-            emptyNode = new NodeWritable();
         }
 
+        /*
+         * Identify the heads and tails of simple path nodes and their neighbors
+         * 
+         * (non-Javadoc)
+         * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+         */
         @Override
         public void map(NodeWritable key, NullWritable value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
-                throws IOException {
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
             inDegree = key.inDegree();
             outDegree = key.outDegree();
             if (inDegree == 1 && outDegree == 1) {
-                // simple path nodes map themselves
-                outputValue.set(MessageFlag.FROM_SELF, key);
-                output.collect(key.getNodeID(), outputValue);
-                reporter.incrCounter("genomix", "path_nodes", 1);
+                pathNode = true;
             } else if (inDegree == 0 && outDegree == 1) {
+                pathNode = true;
                 // start of a tip.  needs to merge & be marked as head
-                outputValue.set(MessageFlag.FROM_SELF, key);
-                output.collect(key.getNodeID(), outputValue);
-                reporter.incrCounter("genomix", "path_nodes", 1);
-
-                outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
+                outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
                 output.collect(key.getNodeID(), outputValue);
             } else if (inDegree == 1 && outDegree == 0) {
+                pathNode = true;
                 // end of a tip.  needs to merge & be marked as tail
-                outputValue.set(MessageFlag.FROM_SELF, key);
-                output.collect(key.getNodeID(), outputValue);
-                reporter.incrCounter("genomix", "path_nodes", 1);
-
-                outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
+                outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
                 output.collect(key.getNodeID(), outputValue);
             } else {
+                pathNode = false;
                 if (outDegree > 0) {
                     // Not a path myself, but my successor might be one. Map forward successor to find heads
-                    outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
-                    posIterator = key.getFFList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
-                    posIterator = key.getFRList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
+                    outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+                    sendOutputToNextNeighbors(key, outputValue, output);
                 }
                 if (inDegree > 0) {
                     // Not a path myself, but my predecessor might be one. map predecessor to find tails 
-                    outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
-                    posIterator = key.getRRList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
-                    posIterator = key.getRFList().iterator();
-                    while (posIterator.hasNext()) {
-                        outputKey.set(posIterator.next());
-                        output.collect(outputKey, outputValue);
-                    }
+                    outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+                    sendOutputToPreviousNeighbors(key, outputValue, output);
                 }
-                // push this non-path node to the "complete" output
-                outputValue.set((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), key);
+                // this non-path node won't participate in the merge. Mark as "complete" (H + T)
+                outputValue.set((byte) (MessageFlag.MSG_SELF | MessageFlag.IS_HEAD | MessageFlag.IS_TAIL), key);
                 output.collect(key.getNodeID(), outputValue);
             }
+
+            if (pathNode) {
+                // simple path nodes map themselves
+                outputValue.set(MessageFlag.MSG_SELF, key);
+                output.collect(key.getNodeID(), outputValue);
+                reporter.incrCounter("genomix", "path_nodes", 1);
+
+                // also mark neighbors of paths (they are candidates for updates)
+                outputValue.set(NEAR_PATH, NodeWritable.EMPTY_NODE);
+                sendOutputToNextNeighbors(key, outputValue, output);
+                sendOutputToPreviousNeighbors(key, outputValue, output);
+            }
         }
     }
 
     public static class PathNodeInitialReducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private MultipleOutputs mos;
-        private static final String COMPLETE_OUTPUT = "complete";
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+        private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
         private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
+
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
         private NodeWritable nodeToKeep;
-        private int count;
-        private byte flag;
-        private boolean isComplete;
+        private byte outputFlag;
+        private byte inputFlag;
+        private boolean sawSelf;
 
         public void configure(JobConf conf) {
             mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
-            inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            inputValue = new NodeWithFlagWritable(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
             nodeToKeep = new NodeWritable(KMER_SIZE);
         }
 
+        /*
+         * Segregate nodes into three bins:
+         *   1. mergeable nodes (maybe marked H or T)
+         *   2. non-mergeable nodes that are candidates for updates
+         *   3. non-mergeable nodes that are not path neighbors and won't be updated
+         * 
+         * (non-Javadoc)
+         * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+         */
         @SuppressWarnings("unchecked")
         @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
                 throws IOException {
+            completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+            toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
 
-            inputValue.set(values.next());
-            if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
-                    if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
-                        // non-path node.  Store in "complete" output
-                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+            outputFlag = MessageFlag.EMPTY_MESSAGE;
+            sawSelf = false;
+            while (values.hasNext()) {
+                inputValue.set(values.next());
+                inputFlag = inputValue.getFlag();
+                outputFlag |= inputFlag;
+
+                if ((inputFlag & MessageFlag.MSG_SELF) > 0) {
+                    // SELF -> keep this node
+                    if (sawSelf) {
+                        throw new IOException("Already saw SELF node in PathNodeInitialReducer! previous self: "
+                                + nodeToKeep.toString() + ". current self: " + inputValue.getNode().toString());
+                    }
+                    sawSelf = true;
+                    nodeToKeep.set(inputValue.getNode());
+                }
+            }
+
+            if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
+                if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
+                    // non-path or single path nodes
+                    if ((outputFlag & NEAR_PATH) > 0) {
+                        // non-path, but an update candidate
+                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+                        toUpdateCollector.collect(key, outputValue);
                     } else {
-                        // FROM_SELF => need to keep this PATH node
-                        output.collect(key, inputValue);
+                        // non-path or single-node path.  Store in "complete" output
+                        outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+                        completeCollector.collect(key, outputValue);
+                    }
+                } else {
+                    // path nodes that are mergeable
+                    outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
+                    outputValue.set(outputFlag, nodeToKeep);
+                    toMergeCollector.collect(key, outputValue);
+
+                    reporter.incrCounter("genomix", "path_nodes", 1);
+                    if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_heads", 1);
+                    }
+                    if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
+                        reporter.incrCounter("genomix", "path_nodes_tails", 1);
                     }
                 }
             } else {
-                // multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node 
-                count = 0;
-                flag = MessageFlag.EMPTY_MESSAGE;
-                isComplete = false;
-                while (true) { // process values; break when no more
-                    count++;
-                    if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
-                        // SELF -> keep this node
-                        flag |= MessageFlag.FROM_SELF;
-                        nodeToKeep.set(inputValue.getNode());
-                        if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
-                            isComplete = true;
-                        }
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
-                        flag |= MessageFlag.IS_TAIL;
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
-                        flag |= MessageFlag.IS_HEAD;
-                    }
-                    if (!values.hasNext()) {
-                        break;
-                    } else {
-                        inputValue.set(values.next());
-                    }
-                }
-                if (count < 2) {
-                    throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
-                            + String.valueOf(count));
-                }
-                if ((flag & MessageFlag.FROM_SELF) > 0) {
-                    if ((flag & MessageFlag.IS_COMPLETE) > 0) {
-                        // non-path node.  Store in "complete" output
-                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
-                    } else {
-                        // only keep simple path nodes
-                        outputValue.set(flag, nodeToKeep);
-                        output.collect(key, outputValue);
-
-                        reporter.incrCounter("genomix", "path_nodes", 1);
-                        if ((flag & MessageFlag.IS_HEAD) > 0) {
-                            reporter.incrCounter("genomix", "path_nodes_heads", 1);
-                        }
-                        if ((flag & MessageFlag.IS_TAIL) > 0) {
-                            reporter.incrCounter("genomix", "path_nodes_tails", 1);
-                        }
-                    }
-                } else {
-                    throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + flag);
-                }
+                throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
             }
         }
+
+        public void close() throws IOException {
+            mos.close();
+        }
     }
 
     /*
      * Mark the head, tail, and simple path nodes in one map-reduce job.
      */
-    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
+            JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(PathNodeInitial.class);
         conf.setJobName("PathNodeInitial " + inputPath);
 
-        FileInputFormat.addInputPath(conf, new Path(inputPath));
-        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+        FileInputFormat.addInputPaths(conf, inputPath);
+        //        Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
+        Path outputPath = new Path(toMergeOutput);
+        FileOutputFormat.setOutputPath(conf, outputPath);
 
         conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        conf.setOutputFormat(NullOutputFormat.class);
 
         conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setMapOutputValueClass(NodeWithFlagWritable.class);
         conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputValueClass(NodeWithFlagWritable.class);
 
         conf.setMapperClass(PathNodeInitialMapper.class);
         conf.setReducerClass(PathNodeInitialReducer.class);
 
-        FileSystem.get(conf).delete(new Path(outputPath), true);
+        MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, NodeWithFlagWritable.class);
+        MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+                PositionWritable.class, NodeWithFlagWritable.class);
 
-        return JobClient.runJob(conf);
+        FileSystem dfs = FileSystem.get(conf);
+        dfs.delete(outputPath, true); // clean output dir
+        RunningJob job = JobClient.runJob(conf);
+
+        // move the tmp outputs to the arg-spec'ed dirs
+        dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
+        dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+        //        dfs.rename(outputPath, new Path(toMergeOutput));
+
+        return job;
     }
 
     @Override
-    public int run(String[] arg0) throws Exception {
-        // TODO Auto-generated method stub
-        return 0;
+    public int run(String[] args) throws Exception {
+        int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+        return res;
     }
 
     public static void main(String[] args) throws Exception {
-        int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+        int res = new PathNodeInitial().run(args);
         System.exit(res);
     }
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
index c735a0d..4040cab 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
@@ -53,14 +53,14 @@
             boolean onlyTest1stJob, boolean seqOutput, String defaultConfPath) throws IOException {
         if (onlyTest1stJob == true) {
             
-            runfirstjob(inputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
+            runfirstjob(inputPath, outputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
         } else {
-            runfirstjob(inputPath, numReducers, sizeKmer, readLength, true, defaultConfPath);
-            runsecondjob(inputPath, outputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
+            runfirstjob(inputPath, inputPath + "-tmp", numReducers, sizeKmer, readLength, true, defaultConfPath);
+            runsecondjob(inputPath + "-tmp", outputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
         }
     }
 
-    public void runfirstjob(String inputPath, int numReducers, int sizeKmer, int readLength, boolean seqOutput,
+    public void runfirstjob(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength, boolean seqOutput,
             String defaultConfPath) throws IOException {
         JobConf conf = new JobConf(GraphBuildingDriver.class);
         conf.setInt("sizeKmer", sizeKmer);
@@ -85,14 +85,14 @@
         conf.setOutputValueClass(PositionListWritable.class);
 
         FileInputFormat.setInputPaths(conf, new Path(inputPath));
-        FileOutputFormat.setOutputPath(conf, new Path(inputPath + "-step1"));
+        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
         if (numReducers == 0)
             conf.setNumReduceTasks(numReducers + 2);
         else
             conf.setNumReduceTasks(numReducers);
 
         FileSystem dfs = FileSystem.get(conf);
-        dfs.delete(new Path(inputPath + "-step1"), true);
+        dfs.delete(new Path(outputPath), true);
         JobClient.runJob(conf);
     }
 
@@ -132,7 +132,7 @@
             conf.setOutputValueClass(PositionListAndKmerWritable.class);
         }
 
-        FileInputFormat.setInputPaths(conf, new Path(inputPath + "-step1"));
+        FileInputFormat.setInputPaths(conf, new Path(inputPath));
         FileOutputFormat.setOutputPath(conf, new Path(outputPath));
         conf.setNumReduceTasks(numReducers);
         FileSystem dfs = FileSystem.get(conf);
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index cc922de..c04ba46 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -54,11 +54,12 @@
         }
         
         PathNodeInitial inith3 = new PathNodeInitial();
-        inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
-        copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+        inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
+        copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+        copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
 
         MergePathsH3Driver h3 = new MergePathsH3Driver();
-        h3.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
+        h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
         copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
     }
 
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 9e799f3..45a2d0a 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 
@@ -11,27 +12,27 @@
 
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3Driver;
 import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
+import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 
 @SuppressWarnings("deprecation")
-public class TestPathMergeH4 extends GenomixMiniClusterTest {
-    protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
-    protected String HDFS_SEQUENCE = "/00-sequence/";
-    protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
-    protected String HDFS_MARKPATHS = "/02-pathmark/";
-    protected String HDFS_MERGED = "/03-pathmerge/";
+public class TestPathMergeH4 extends HadoopMiniClusterTest {
+    protected final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+    protected final  String SEQUENCE = "/00-sequence/";
+    protected final String GRAPHBUILD = "/01-graphbuild/";
+    protected final String MERGED = "/02-pathmerge/";
     
-    protected String GRAPHBUILD_FILE = "graphbuild.result";
-    protected String PATHMARKS_FILE = "markpaths.result";
-    protected String PATHMERGE_FILE = "h4.mergepath.result";
-    protected boolean regenerateGraph = true;
+    protected final String ACTUAL = "src/test/resources/actual/";
+    
+    protected final boolean regenerateGraph = true;
     
     {
         KMER_LENGTH = 5;
         READ_LENGTH = 8;
-        HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MARKPATHS, HDFS_MERGED));
+        HDFS_PATHS = new ArrayList<String>(Arrays.asList(SEQUENCE, GRAPHBUILD, MERGED));
         conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
         conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
     }
@@ -39,34 +40,45 @@
     @Test
     public void TestMergeOneIteration() throws Exception {
         cleanUpOutput();
-        if (regenerateGraph) {
-            copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
-            buildGraph();
-            copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
-        } else {
-            copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
-        }
+        prepareGraph();
         
-        PathNodeInitial inith4 = new PathNodeInitial();
-        inith4.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
-        copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
-
         MergePathsH4Driver h4 = new MergePathsH4Driver();
-        h4.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
-        copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
+        h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 5, conf);
+        copyResultsToLocal(MERGED, ACTUAL_ROOT + MERGED, false, conf);
     }
 
+//    @Test
+    public void testPathNode() throws IOException {
+        cleanUpOutput();
+        prepareGraph();
+
+        // identify head and tail nodes with pathnode initial
+        PathNodeInitial inith4 = new PathNodeInitial();
+        inith4.run(GRAPHBUILD, "/toMerge", "/completed", conf);
+    }
+    
+    
 
 
-    public void buildGraph() throws Exception {
+    public void buildGraph() throws IOException {
         JobConf buildConf = new JobConf(conf);  // use a separate conf so we don't interfere with other jobs 
-        FileInputFormat.setInputPaths(buildConf, HDFS_SEQUENCE);
-        FileOutputFormat.setOutputPath(buildConf, new Path(HDFS_GRAPHBUILD));
-        buildConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
-        buildConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        driver.runJob(new GenomixJobConf(buildConf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        String fileFormat = buildConf.get(GenomixJobConf.OUTPUT_FORMAT);
-        boolean resultsAreText = GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat);
-        copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, resultsAreText, buildConf);
+        FileInputFormat.setInputPaths(buildConf, SEQUENCE);
+        FileOutputFormat.setOutputPath(buildConf, new Path(GRAPHBUILD));
+        
+        GraphBuildingDriver tldriver = new GraphBuildingDriver();
+        tldriver.run(SEQUENCE, GRAPHBUILD, 2, KMER_LENGTH, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
+        
+        boolean resultsAreText = false;
+        copyResultsToLocal(GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD, resultsAreText, buildConf);
+    }
+    
+    private void prepareGraph() throws IOException {
+        if (regenerateGraph) {
+            copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
+            buildGraph();
+            copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+        } else {
+            copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+        }
     }
 }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
new file mode 100644
index 0000000..9a113f1
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -0,0 +1,199 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+
+/*
+ * A base class providing most of the boilerplate for Hadoop-based tests
+ */
+@SuppressWarnings("deprecation")
+public class HadoopMiniClusterTest {
+    protected int KMER_LENGTH = 5;
+    protected int READ_LENGTH = 8;
+
+    // subclass should modify this to include the HDFS directories that should be cleaned up
+    protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+    protected static String EXPECTED_ROOT = "src/test/resources/expected/";
+    protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+
+    protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+    protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
+
+    protected static MiniDFSCluster dfsCluster;
+    protected static MiniMRCluster mrCluster;
+    protected static FileSystem dfs;
+    protected static JobConf conf = new JobConf();
+    protected static int numberOfNC = 1;
+    protected static int numPartitionPerMachine = 1;
+
+    @BeforeClass
+    public static void setUpMiniCluster() throws Exception {
+        cleanupStores();
+        startHDFS();
+        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+    }
+
+    /*
+     * Merge and copy a DFS directory to a local destination, converting to text if necessary. 
+     * Also locally store the binary-formatted result if available.
+     */
+    protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf) throws IOException {
+        if (resultsAreText) {
+            // for text files, just concatenate them together
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+                    new Path(localDestFile), false, conf, null);
+        } else {
+            // file is binary
+            // save the entire binary output dir
+            FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+                    new Path(localDestFile + ".bindir"), false, conf);
+            
+            // chomp through output files
+            FileStatus[] files = ArrayUtils.addAll(dfs.globStatus(new Path(hdfsSrcDir + "*")), dfs.globStatus(new Path(hdfsSrcDir + "*/*")));
+            FileStatus validFile = null;
+            for (FileStatus f : files) {
+            	if (f.getLen() != 0) {
+            		validFile = f;
+            		break;
+            	}
+            }
+            if (validFile == null) {
+                throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+            }
+
+            // also load the Nodes and write them out as text locally. 
+            FileSystem lfs = FileSystem.getLocal(new Configuration());
+            lfs.mkdirs(new Path(localDestFile).getParent());
+            File filePathTo = new File(localDestFile);
+            if (filePathTo.exists() && filePathTo.isDirectory()) {
+                filePathTo = new File(localDestFile + "/data");
+            }
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+            SequenceFile.Reader reader = new SequenceFile.Reader(dfs, validFile.getPath(), conf);
+            SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+                    + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+            for (FileStatus f : files) {
+                if (f.getLen() == 0) {
+                    continue;
+                }
+                reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                    writer.append(key, value);
+
+                }
+                reader.close();
+            }
+            writer.close();
+            bw.close();
+        }
+
+    }
+
+    protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+        File dumped = new File(actualPath);
+        if (poslistField != null) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
+
+    protected static void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    protected static void startHDFS() throws IOException {
+//        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+        //        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+//        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        dfs = dfsCluster.getFileSystem();
+        mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+        System.out.println(dfs.getUri().toString());
+
+        DataOutputStream confOutput = new DataOutputStream(
+                new FileOutputStream(new File(HADOOP_CONF)));
+        conf.writeXml(confOutput);
+        confOutput.close();
+    }
+
+    protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+        Path dest = new Path(hdfsDest);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(new Path(localSrc), dest);
+    }
+
+    /*
+     * Remove the local "actual" folder and any hdfs folders in use by this test
+     */
+    public void cleanUpOutput() throws IOException {
+        // local cleanup
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(ACTUAL_ROOT))) {
+            lfs.delete(new Path(ACTUAL_ROOT), true);
+        }
+        // dfs cleanup
+        for (String path : HDFS_PATHS) {
+            if (dfs.exists(new Path(path))) {
+                dfs.delete(new Path(path), true);
+            }
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        cleanupHDFS();
+    }
+
+    protected static void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+        mrCluster.shutdown();
+    }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
deleted file mode 100644
index b142f87..0000000
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package edu.uci.ics.genomix.hadoop.pmcommon;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mrunit.MapDriver;
-import org.apache.hadoop.mrunit.ReduceDriver;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-
-@SuppressWarnings("deprecation")
-public class TestPathNodeInitial {
-    PositionWritable posn1 = new PositionWritable(0, (byte) 1);
-    PositionWritable posn2 = new PositionWritable(1, (byte) 1);
-    PositionWritable posn3 = new PositionWritable(2, (byte) 1);
-    PositionWritable posn4 = new PositionWritable(3, (byte) 1);
-    PositionWritable posn5 = new PositionWritable(5, (byte) 1);
-    String kmerString = "ATGCA";
-    KmerBytesWritable kmer = new KmerBytesWritable(kmerString.length(), kmerString);
-    JobConf conf = new JobConf();
-    MultipleOutputs mos = new MultipleOutputs(conf); 
-
-    {
-        conf.set("sizeKmer", String.valueOf(kmerString.length()));
-    }
-
-    @Test
-    public void testNoNeighbors() throws IOException {
-        NodeWritable noNeighborNode = new NodeWritable(posn1, new PositionListWritable(), new PositionListWritable(),
-                new PositionListWritable(), new PositionListWritable(), kmer);
-        MessageWritableNodeWithFlag output = new MessageWritableNodeWithFlag((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), noNeighborNode);
-        // test mapper
-        new MapDriver<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag>()
-                .withMapper(new PathNodeInitial.PathNodeInitialMapper())
-                .withConfiguration(conf)
-                .withInput(noNeighborNode, NullWritable.get())
-                .withOutput(posn1, output)
-                .runTest();
-        // test reducer
-//        MultipleOutputs.addNamedOutput(conf, "complete", SequenceFileOutputFormat.class, PositionWritable.class, MessageWritableNodeWithFlag.class);
-        new ReduceDriver<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag>()
-                .withReducer(new PathNodeInitial.PathNodeInitialReducer())
-                .withConfiguration(conf)
-                .withInput(posn1, Arrays.asList(output))
-                .runTest();
-    }
-}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
index f6236d2..2517810 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
@@ -42,15 +42,16 @@
         FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
         FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
         startHadoop();
-        TestGroupbyKmer();
-        TestMapKmerToRead();
+//        TestGroupbyKmer();
+//        TestMapKmerToRead();
+        TestGroupByReadID();
         cleanupHadoop();
     }
 
     public void TestGroupbyKmer() throws Exception {
         GraphBuildingDriver tldriver = new GraphBuildingDriver();
         tldriver.run(HDFS_PATH, RESULT_PATH, COUNT_REDUCER, SIZE_KMER, READ_LENGTH, true, false, HADOOP_CONF_PATH);
-        dumpGroupByKmerResult();
+        dumpResult();
     }
 
     public void TestMapKmerToRead() throws Exception {
@@ -89,11 +90,6 @@
         dfsCluster.shutdown();
     }
 
-    private void dumpGroupByKmerResult() throws IOException {
-        Path src = new Path(HDFS_PATH + "-step1");
-        Path dest = new Path(ACTUAL_RESULT_DIR);
-        dfs.copyToLocalFile(src, dest);
-    }
     
     private void dumpResult() throws IOException {
         Path src = new Path(RESULT_PATH);