refactor h4 update mapper to use update messages
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 7e6df5e..e6b1575 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
 
 import java.io.File;
@@ -31,18 +46,36 @@
 import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
 import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
+/*
+ * a probabilistic merge algorithm for merging long single paths (chains without only 1 incoming and outgoing edge)
+ * The merge is guaranteed to succeed, but not all nodes that could be merged in an iteration will be.
+ * 
+ * There are two steps to the merge: 
+ *    1. (H4UpdatesMapper & H4UpdatesReducer): the direction of the merge is chosen and all 
+ *       neighbor's edges are updated with the merge intent 
+ *    2. H4MergeMapper & H4MergeReducer): the nodes initiating the merge are "sent" to their neighbors, kmers are combined, and edges 
+ *       are again updated (since the merge-initiator may be neighbor to another merging node).  
+ */
 @SuppressWarnings("deprecation")
 public class MergePathsH4 extends Configured implements Tool {
 
+    private enum MergeDir {
+        NO_MERGE,
+        FORWARD,
+        BACKWARD
+
+    }
+
     /*
-     * Mapper class: Partition the graph using random pseudoheads.
-     * Heads send themselves to their successors, and all others map themselves.
+     * Mapper class: randomly chooses a direction to merge s.t. if a merge takes place, it will be successful.
+     *      Sends update messages to all of this node's neighbors who their new neighbor will be
      */
-    public static class MergePathsH4Mapper extends MapReduceBase implements
+    public static class H4UpdatesMapper extends MapReduceBase implements
             Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
         private static long randSeed;
         private Random randGenerator;
@@ -51,6 +84,12 @@
         private int KMER_SIZE;
         private PositionWritable outputKey;
         private NodeWithFlagWritable outputValue;
+        private PositionWritable mergeMsgKey;
+        private NodeWithFlagWritable mergeMsgValue;
+        private PositionWritable updateMsgKey;
+        private NodeWithFlagWritable updateMsgValue;
+        private NodeWritable updateMsgNode;
+
         private NodeWritable curNode;
         private PositionWritable curID;
         private PositionWritable nextID;
@@ -60,13 +99,16 @@
         private boolean curHead;
         private boolean nextHead;
         private boolean prevHead;
-        private boolean willMerge;
+        private MergeDir mergeDir;
+        private byte inFlag;
         private byte headFlag;
         private byte tailFlag;
-        private byte outFlag;
+        private byte mergeMsgFlag;
+        private byte nextDir;
+        private byte prevDir;
 
         public void configure(JobConf conf) {
-            
+
             randSeed = conf.getLong("randomSeed", 0);
             randGenerator = new Random(randSeed);
             probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
@@ -74,6 +116,12 @@
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             outputValue = new NodeWithFlagWritable(KMER_SIZE);
             outputKey = new PositionWritable();
+
+            mergeMsgKey = new PositionWritable();
+            mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+            updateMsgKey = new PositionWritable();
+            updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
             curNode = new NodeWritable(KMER_SIZE);
             curID = new PositionWritable();
             nextID = new PositionWritable();
@@ -91,11 +139,13 @@
          */
         protected boolean setNextInfo(NodeWritable node) {
             if (node.getFFList().getCountOfPosition() > 0) {
+                nextDir = MessageFlag.DIR_FF;
                 nextID.set(node.getFFList().getPosition(0));
                 nextHead = isNodeRandomHead(nextID);
                 return true;
             }
             if (node.getFRList().getCountOfPosition() > 0) {
+                nextDir = MessageFlag.DIR_FR;
                 nextID.set(node.getFRList().getPosition(0));
                 nextHead = isNodeRandomHead(nextID);
                 return true;
@@ -108,11 +158,13 @@
          */
         protected boolean setPrevInfo(NodeWritable node) {
             if (node.getRRList().getCountOfPosition() > 0) {
+                prevDir = MessageFlag.DIR_RR;
                 prevID.set(node.getRRList().getPosition(0));
                 prevHead = isNodeRandomHead(prevID);
                 return true;
             }
             if (node.getRFList().getCountOfPosition() > 0) {
+                prevDir = MessageFlag.DIR_RF;
                 prevID.set(node.getRFList().getPosition(0));
                 prevHead = isNodeRandomHead(prevID);
                 return true;
@@ -122,47 +174,33 @@
 
         @Override
         public void map(PositionWritable key, NodeWithFlagWritable value,
-                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
-                throws IOException {
-            // Node may be marked as head b/c it's a real head or a real tail
-            headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
-            tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
-            outFlag = (byte) (headFlag | tailFlag);
-            
-            // only PATH vertices are present. Find the ID's for my neighbors
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            inFlag = value.getFlag();
             curNode.set(value.getNode());
             curID.set(curNode.getNodeID());
-            
+
+            headFlag = (byte) (MessageFlag.IS_HEAD & inFlag);
+            tailFlag = (byte) (MessageFlag.IS_TAIL & inFlag);
+            mergeMsgFlag = (byte) (headFlag | tailFlag);
+
             curHead = isNodeRandomHead(curID);
             // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
             // We prevent merging towards non-path nodes
             hasNext = setNextInfo(curNode) && tailFlag == 0;
             hasPrev = setPrevInfo(curNode) && headFlag == 0;
-            willMerge = false;
-            
-            // TODO: need to update edges in neighboring nodes
-            
-            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
-                // true HEAD met true TAIL. this path is complete
-                outFlag |= MessageFlag.MSG_SELF;
-                outputValue.set(outFlag, curNode);
-                output.collect(curID, outputValue);
-                return;
-            }
+            mergeDir = MergeDir.NO_MERGE; // no merge to happen
+
+            // decide where we're going to merge to
             if (hasNext || hasPrev) {
                 if (curHead) {
                     if (hasNext && !nextHead) {
-                        // compress this head to the forward tail
-                        outFlag |= MessageFlag.FROM_PREDECESSOR;
-                        outputValue.set(outFlag, curNode);
-                        output.collect(nextID, outputValue);
-                        willMerge = true;
+                        // merge forward
+                        mergeMsgFlag |= nextDir;
+                        mergeDir = MergeDir.FORWARD;
                     } else if (hasPrev && !prevHead) {
-                        // compress this head to the reverse tail
-                        outFlag |= MessageFlag.FROM_SUCCESSOR;
-                        outputValue.set(outFlag, curNode);
-                        output.collect(prevID, outputValue);
-                        willMerge = true;
+                        // merge backwards
+                        mergeMsgFlag |= prevDir;
+                        mergeDir = MergeDir.BACKWARD;
                     }
                 } else {
                     // I'm a tail
@@ -170,42 +208,80 @@
                         if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
                             // tails on both sides, and I'm the "local minimum"
                             // compress me towards the tail in forward dir
-                            outFlag |= MessageFlag.FROM_PREDECESSOR;
-                            outputValue.set(outFlag, curNode);
-                            output.collect(nextID, outputValue);
-                            willMerge = true;
+                            mergeMsgFlag |= nextDir;
+                            mergeDir = MergeDir.FORWARD;
                         }
                     } else if (!hasPrev) {
                         // no previous node
                         if (!nextHead && curID.compareTo(nextID) < 0) {
                             // merge towards tail in forward dir
-                            outFlag |= MessageFlag.FROM_PREDECESSOR;
-                            outputValue.set(outFlag, curNode);
-                            output.collect(nextID, outputValue);
-                            willMerge = true;
+                            mergeMsgFlag |= nextDir;
+                            mergeDir = MergeDir.FORWARD;
                         }
                     } else if (!hasNext) {
                         // no next node
                         if (!prevHead && curID.compareTo(prevID) < 0) {
                             // merge towards tail in reverse dir
-                            outFlag |= MessageFlag.FROM_SUCCESSOR;
-                            outputValue.set(outFlag, curNode);
-                            output.collect(prevID, outputValue);
-                            willMerge = true;
+                            mergeMsgFlag |= prevDir;
+                            mergeDir = MergeDir.BACKWARD;
                         }
                     }
                 }
             }
 
-            // if we didn't send ourselves to some other node, remap ourselves for the next round
-            if (!willMerge) {
-                outFlag |= MessageFlag.MSG_SELF;
-                outputValue.set(outFlag, curNode);
-                output.collect(curID, outputValue);
+            if (mergeDir == MergeDir.NO_MERGE) {
+                mergeMsgFlag |= MessageFlag.MSG_SELF;
+                mergeMsgValue.set(mergeMsgFlag, curNode);
+                output.collect(curID, mergeMsgValue);
+            } else {
+                // this node will do a merge next round
+                mergeMsgFlag |= MessageFlag.MSG_UPDATE_MERGE;
+                mergeMsgValue.set(mergeMsgFlag, curNode);
+                output.collect(curID, mergeMsgValue);
+
+                sendUpdateToNeighbors(curNode, (byte) (mergeMsgFlag & MessageFlag.DIR_MASK), output);
             }
-            else {
-                // TODO send update to this node's neighbors
-                //mos.getCollector(UPDATES_OUTPUT, reporter).collect(key, outputValue);
+        }
+
+        /*
+         * when performing a merge, an update message needs to be sent to my neighbors
+         */
+        private void sendUpdateToNeighbors(NodeWritable node, byte mergeDir,
+                OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+            PositionWritable mergeSource = node.getNodeID();
+            PositionWritable mergeTarget = node.getListFromDir(mergeDir).getPosition(0);
+
+            // I need to notify in the opposite direction as I'm merging
+            Iterator<PositionWritable> posIterator1;
+            byte dir1;
+            Iterator<PositionWritable> posIterator2;
+            byte dir2;
+            switch (mergeDir) {
+                case MessageFlag.DIR_FF:
+                case MessageFlag.DIR_FR:
+                    // merging forward; tell my previous neighbors
+                    posIterator1 = node.getRRList().iterator();
+                    dir1 = MessageFlag.DIR_RR;
+                    posIterator2 = node.getRFList().iterator();
+                    dir2 = MessageFlag.DIR_RF;
+                    break;
+                case MessageFlag.DIR_RF:
+                case MessageFlag.DIR_RR:
+                    posIterator1 = node.getFFList().iterator();
+                    dir1 = MessageFlag.DIR_FF;
+                    posIterator2 = node.getFRList().iterator();
+                    dir2 = MessageFlag.DIR_FR;
+                    break;
+                default:
+                    throw new IOException("Unrecognized direction in sendUpdateToNeighbors: " + mergeDir);
+            }
+            while (posIterator1.hasNext()) {
+                updateMsgValue.setAsUpdateMessage(mergeDir, dir1, mergeSource, mergeTarget);
+                collector.collect(posIterator1.next(), updateMsgValue);
+            }
+            while (posIterator2.hasNext()) {
+                updateMsgValue.setAsUpdateMessage(mergeDir, dir2, mergeSource, mergeTarget);
+                collector.collect(posIterator2.next(), outputValue);
             }
         }
     }
@@ -222,7 +298,7 @@
         private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
         private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
         private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
-        
+
         private int KMER_SIZE;
         private NodeWithFlagWritable inputValue;
         private NodeWithFlagWritable outputValue;
@@ -248,19 +324,19 @@
         @SuppressWarnings("unchecked")
         @Override
         public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
-                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
-                throws IOException {
-        	toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
-        	completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
-        	updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
+                OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+            toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+            completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+            updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
                 if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
-                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0
+                            && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
                         // complete path (H & T meet in this node)
                         completeCollector.collect(key, inputValue);
-                    } else { 
+                    } else {
                         // FROM_SELF => no merging this round. remap self
                         toMergeCollector.collect(key, inputValue);
                     }
@@ -314,7 +390,7 @@
                     curNode.mergeForwardPre(prevNode, KMER_SIZE);
                     reporter.incrCounter("genomix", "num_merged", 1);
                 }
-                
+
                 outputValue.set(outFlag, curNode);
                 if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
                     // True heads meeting tails => merge is complete for this node
@@ -333,7 +409,8 @@
     /*
      * Run one iteration of the mergePaths algorithm
      */
-    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput, JobConf baseConf) throws IOException {
+    public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput,
+            JobConf baseConf) throws IOException {
         JobConf conf = new JobConf(baseConf);
         conf.setJarByClass(MergePathsH4.class);
         conf.setJobName("MergePathsH4 " + inputPath);
@@ -350,17 +427,17 @@
         conf.setOutputKeyClass(PositionWritable.class);
         conf.setOutputValueClass(NodeWithFlagWritable.class);
 
-        conf.setMapperClass(MergePathsH4Mapper.class);
+        conf.setMapperClass(H4UpdatesMapper.class);
         conf.setReducerClass(MergePathsH4Reducer.class);
-        
+
         MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, NodeWithFlagWritable.class);
+                PositionWritable.class, NodeWithFlagWritable.class);
         MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, NodeWithFlagWritable.class);
+                PositionWritable.class, NodeWithFlagWritable.class);
         MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
-        		PositionWritable.class, NodeWithFlagWritable.class);
-        
-        FileSystem dfs = FileSystem.get(conf); 
+                PositionWritable.class, NodeWithFlagWritable.class);
+
+        FileSystem dfs = FileSystem.get(conf);
         // clean output dirs
         dfs.delete(outputPath, true);
         dfs.delete(new Path(toMergeOutput), true);
@@ -368,18 +445,21 @@
         dfs.delete(new Path(updatesOutput), true);
 
         RunningJob job = JobClient.runJob(conf);
-        
+
         // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
-        if (!dfs.rename(new Path(outputPath + File.separator +  MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(toMergeOutput))) {
+        if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(
+                toMergeOutput))) {
             dfs.mkdirs(new Path(toMergeOutput));
         }
-        if (!dfs.rename(new Path(outputPath + File.separator +  MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(completeOutput))) {
+        if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(
+                completeOutput))) {
             dfs.mkdirs(new Path(completeOutput));
         }
-        if (!dfs.rename(new Path(outputPath + File.separator +  MergePathsH4Reducer.UPDATES_OUTPUT), new Path(updatesOutput))) {
+        if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.UPDATES_OUTPUT), new Path(
+                updatesOutput))) {
             dfs.mkdirs(new Path(updatesOutput));
         }
-        
+
         return job;
     }
 
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
index 4009fc6..198c769 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -25,7 +25,7 @@
 
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.H4UpdatesMapper;
 import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
@@ -130,7 +130,7 @@
         conf.setOutputKeyClass(PositionWritable.class);
         conf.setOutputValueClass(NodeWithFlagWritable.class);
 
-        conf.setMapperClass(MergePathsH4Mapper.class);
+        conf.setMapperClass(H4UpdatesMapper.class);
         conf.setReducerClass(MergePathsH4Reducer.class);
 
         FileSystem.get(conf).delete(new Path(outputPath), true);