WIP H4 1 mapper + 2 reducers
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 9675fc0..2c64139 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -117,7 +117,7 @@
     }
 
     public PositionListWritable getListFromDir(byte dir) {
-        switch (dir) {
+        switch (dir & DirectionFlag.DIR_MASK) {
             case DirectionFlag.DIR_FF:
                 return getFFList();
             case DirectionFlag.DIR_FR:
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index e6b1575..1fef016 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -17,6 +17,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -82,13 +83,9 @@
         private float probBeingRandomHead;
 
         private int KMER_SIZE;
-        private PositionWritable outputKey;
         private NodeWithFlagWritable outputValue;
-        private PositionWritable mergeMsgKey;
         private NodeWithFlagWritable mergeMsgValue;
-        private PositionWritable updateMsgKey;
         private NodeWithFlagWritable updateMsgValue;
-        private NodeWritable updateMsgNode;
 
         private NodeWritable curNode;
         private PositionWritable curID;
@@ -115,11 +112,8 @@
 
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             outputValue = new NodeWithFlagWritable(KMER_SIZE);
-            outputKey = new PositionWritable();
 
-            mergeMsgKey = new PositionWritable();
             mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
-            updateMsgKey = new PositionWritable();
             updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
 
             curNode = new NodeWritable(KMER_SIZE);
@@ -287,9 +281,151 @@
     }
 
     /*
-     * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes 
+     * Reducer class: processes the update messages from updateMapper
      */
-    private static class MergePathsH4Reducer extends MapReduceBase implements
+    private static class H4UpdatesReducer extends MapReduceBase implements
+            Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+        private int KMER_SIZE;
+        private NodeWithFlagWritable inputValue;
+        private NodeWithFlagWritable outputValue;
+        private NodeWritable curNode;
+        private PositionWritable outPosn;
+        private ArrayList<NodeWithFlagWritable> updateMsgs;
+        private boolean sawCurNode;
+        private byte outFlag;
+        private byte inFlag;
+
+        public void configure(JobConf conf) {
+            KMER_SIZE = conf.getInt("sizeKmer", 0);
+            inputValue = new NodeWithFlagWritable(KMER_SIZE);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
+            curNode = new NodeWritable(KMER_SIZE);
+            outPosn = new PositionWritable();
+            updateMsgs = new ArrayList<NodeWithFlagWritable>();
+        }
+
+        /*
+         * Process updates from mapper
+         * 
+         * (non-Javadoc)
+         * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+         */
+        @SuppressWarnings("unchecked")
+        @Override
+        public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            sawCurNode = false;
+            updateMsgs.clear();
+            
+            byte inDir;
+            while (values.hasNext()) {
+                inputValue.set(values.next());
+                inFlag = inputValue.getFlag();
+                inDir = (byte) (inFlag & MessageFlag.MSG_MASK);
+                
+                switch (inDir) {
+                    case MessageFlag.MSG_UPDATE_MERGE:
+                    case MessageFlag.MSG_SELF:
+                        if (sawCurNode)
+                            throw new IOException("Saw more than one MSG_SELF! previously seen self: " + curNode
+                                    + "  current self: " + inputValue.getNode());
+                        curNode.set(inputValue.getNode());
+                        outFlag = inFlag;
+                        sawCurNode = true;
+                        if (inDir == MessageFlag.MSG_SELF) {
+                            outPosn.set(curNode.getNodeID());
+                        } else {  // MSG_UPDATE_MERGE
+                            // merge messages are sent to their merge recipient
+                            outPosn.set(curNode.getListFromDir(inDir).getPosition(0));
+                        }
+                        break;
+                    case MessageFlag.MSG_UPDATE_EDGE:
+                        updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+                        break;
+                    default:
+                        throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
+                }
+            }
+
+            // process all the update messages for this node
+            // I have no idea how to make this more efficient...
+            for (NodeWithFlagWritable updateMsg : updateMsgs) {
+                NodeWithFlagWritable.processUpdates(curNode, updateMsg, KMER_SIZE);
+            }
+            outputValue.set(outFlag, curNode);
+            output.collect(outPosn, outputValue);
+        }
+    }
+    
+    
+    /*
+     * Mapper class: sends the update messages to their (already decided) destination
+     */
+    public static class H4MergeMapper extends MapReduceBase implements
+            Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+        private static long randSeed;
+        private Random randGenerator;
+        private float probBeingRandomHead;
+
+        private int KMER_SIZE;
+        private NodeWithFlagWritable outputValue;
+        private NodeWithFlagWritable mergeMsgValue;
+        private NodeWithFlagWritable updateMsgValue;
+
+        private NodeWritable curNode;
+        private PositionWritable curID;
+        private PositionWritable nextID;
+        private PositionWritable prevID;
+        private boolean hasNext;
+        private boolean hasPrev;
+        private boolean curHead;
+        private boolean nextHead;
+        private boolean prevHead;
+        private MergeDir mergeDir;
+        private byte inFlag;
+        private byte headFlag;
+        private byte tailFlag;
+        private byte mergeMsgFlag;
+        private byte nextDir;
+        private byte prevDir;
+
+        public void configure(JobConf conf) {
+
+            randSeed = conf.getLong("randomSeed", 0);
+            randGenerator = new Random(randSeed);
+            probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+            KMER_SIZE = conf.getInt("sizeKmer", 0);
+            outputValue = new NodeWithFlagWritable(KMER_SIZE);
+
+            mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+            updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
+            curNode = new NodeWritable(KMER_SIZE);
+            curID = new PositionWritable();
+            nextID = new PositionWritable();
+            prevID = new PositionWritable();
+        }
+
+        @Override
+        public void map(PositionWritable key, NodeWithFlagWritable value,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            inFlag = value.getFlag();
+            curNode.set(value.getNode());
+            curID.set(curNode.getNodeID());
+            
+        }
+
+    }
+
+    
+    
+    
+
+    /*
+     * Reducer class: processes the update messages from updateMapper
+     */
+    private static class H4MergeReducer2 extends MapReduceBase implements
             Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private MultipleOutputs mos;
         private static final String TO_MERGE_OUTPUT = "toMerge";
@@ -428,13 +564,13 @@
         conf.setOutputValueClass(NodeWithFlagWritable.class);
 
         conf.setMapperClass(H4UpdatesMapper.class);
-        conf.setReducerClass(MergePathsH4Reducer.class);
+        conf.setReducerClass(H4UpdatesReducer.class);
 
-        MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
                 PositionWritable.class, NodeWithFlagWritable.class);
-        MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
                 PositionWritable.class, NodeWithFlagWritable.class);
-        MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
+        MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
                 PositionWritable.class, NodeWithFlagWritable.class);
 
         FileSystem dfs = FileSystem.get(conf);
@@ -447,16 +583,16 @@
         RunningJob job = JobClient.runJob(conf);
 
         // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
-        if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(
+        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.TO_MERGE_OUTPUT), new Path(
                 toMergeOutput))) {
             dfs.mkdirs(new Path(toMergeOutput));
         }
-        if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(
+        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.COMPLETE_OUTPUT), new Path(
                 completeOutput))) {
             dfs.mkdirs(new Path(completeOutput));
         }
-        if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.UPDATES_OUTPUT), new Path(
-                updatesOutput))) {
+        if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.UPDATES_OUTPUT),
+                new Path(updatesOutput))) {
             dfs.mkdirs(new Path(updatesOutput));
         }
 
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 304cf0c..a7e8157 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -143,10 +143,12 @@
         } else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
             // this message wants to merge node with updateNode.
             // the direction flag indicates how the merge should take place.
+            // TODO send update or remove edge that I merge with
             switch (updateFlag & MessageFlag.DIR_MASK) {
                 case MessageFlag.DIR_FF:
                     node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
                     node.getFFList().set(updateNode.getFFList());
+                    // TODO not just FF list here-- FR as well
                     break;
                 case MessageFlag.DIR_FR:
                     // FIXME not sure if this should be reverse-complement or just reverse...
@@ -185,6 +187,10 @@
         set(flag, node);
     }
 
+    public NodeWithFlagWritable(NodeWithFlagWritable other) {
+        this(other.flag, other.node);
+    }
+
     public void set(NodeWithFlagWritable right) {
         set(right.getFlag(), right.getNode());
     }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 0523812..3c46dc7 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -61,7 +61,7 @@
 
     private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
 
-    private static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+    public static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
             OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
         Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
         while (posIterator.hasNext()) {
@@ -73,7 +73,7 @@
         }
     }
 
-    private static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+    public static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
             OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
         Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
         while (posIterator.hasNext()) {
@@ -179,7 +179,7 @@
 
         /*
          * Segregate nodes into three bins:
-         *   1. mergeable nodes (marked as H/T)
+         *   1. mergeable nodes (maybe marked H or T)
          *   2. non-mergeable nodes that are candidates for updates
          *   3. non-mergeable nodes that are not path neighbors and won't be updated
          *