Merge branch 'anbangx/fullstack_genomix' into genomix/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 17f1a11..7d1738e 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.io.WritableComparator;
 
 import edu.uci.ics.genomix.data.KmerUtil;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
 
 /**
  * Variable kmer length byteswritable
@@ -503,6 +504,25 @@
         appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
         clearLeadBit();
     }
+    
+    public void mergeWithKmerInDir(byte dir, int initialKmerSize, KmerBytesWritable kmer) {
+        switch(dir & DirectionFlag.DIR_MASK) {
+            case DirectionFlag.DIR_FF:
+                mergeWithFFKmer(initialKmerSize, kmer);
+                break;
+            case DirectionFlag.DIR_FR:
+                mergeWithFRKmer(initialKmerSize, kmer);
+                break;
+            case DirectionFlag.DIR_RF:
+                mergeWithRFKmer(initialKmerSize, kmer);
+                break;
+            case DirectionFlag.DIR_RR:
+                mergeWithRRKmer(initialKmerSize, kmer);
+                break;
+            default:
+                throw new RuntimeException("Direciotn not recognized: " + dir);
+        }
+    }
 
     public static void appendOneByteAtPosition(int k, byte onebyte, byte[] buffer, int start, int length) {
         int position = start + length - 1 - k / 4;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 1fef016..a5ba50d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -290,11 +290,15 @@
         private NodeWithFlagWritable outputValue;
         private NodeWritable curNode;
         private PositionWritable outPosn;
-        private ArrayList<NodeWithFlagWritable> updateMsgs;
         private boolean sawCurNode;
         private byte outFlag;
         private byte inFlag;
 
+        // to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
+        private ArrayList<NodeWithFlagWritable> updateMsgs;
+        private int updateMsgsSize;
+        private int updateMsgsCount;
+
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             inputValue = new NodeWithFlagWritable(KMER_SIZE);
@@ -302,6 +306,7 @@
             curNode = new NodeWritable(KMER_SIZE);
             outPosn = new PositionWritable();
             updateMsgs = new ArrayList<NodeWithFlagWritable>();
+            updateMsgsSize = updateMsgs.size();
         }
 
         /*
@@ -315,14 +320,14 @@
         public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
                 OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
             sawCurNode = false;
-            updateMsgs.clear();
-            
+            updateMsgsCount = 0;
+
             byte inDir;
             while (values.hasNext()) {
                 inputValue.set(values.next());
                 inFlag = inputValue.getFlag();
                 inDir = (byte) (inFlag & MessageFlag.MSG_MASK);
-                
+
                 switch (inDir) {
                     case MessageFlag.MSG_UPDATE_MERGE:
                     case MessageFlag.MSG_SELF:
@@ -334,13 +339,13 @@
                         sawCurNode = true;
                         if (inDir == MessageFlag.MSG_SELF) {
                             outPosn.set(curNode.getNodeID());
-                        } else {  // MSG_UPDATE_MERGE
+                        } else { // MSG_UPDATE_MERGE
                             // merge messages are sent to their merge recipient
                             outPosn.set(curNode.getListFromDir(inDir).getPosition(0));
                         }
                         break;
                     case MessageFlag.MSG_UPDATE_EDGE:
-                        updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+                        addUpdateMessage(inputValue);
                         break;
                     default:
                         throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
@@ -349,15 +354,23 @@
 
             // process all the update messages for this node
             // I have no idea how to make this more efficient...
-            for (NodeWithFlagWritable updateMsg : updateMsgs) {
-                NodeWithFlagWritable.processUpdates(curNode, updateMsg, KMER_SIZE);
+            for (int i=0; i < updateMsgsCount; i++) {
+                NodeWithFlagWritable.processUpdates(curNode, updateMsgs.get(i), KMER_SIZE);
             }
             outputValue.set(outFlag, curNode);
             output.collect(outPosn, outputValue);
         }
+
+        private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
+            updateMsgsCount++;
+            if (updateMsgsCount >= updateMsgsSize) {
+                updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+            } else {
+                updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
+            }
+        }
     }
-    
-    
+
     /*
      * Mapper class: sends the update messages to their (already decided) destination
      */
@@ -413,15 +426,11 @@
             inFlag = value.getFlag();
             curNode.set(value.getNode());
             curID.set(curNode.getNodeID());
-            
+
         }
 
     }
 
-    
-    
-    
-
     /*
      * Reducer class: processes the update messages from updateMapper
      */
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index a7e8157..6729c78 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -38,7 +38,8 @@
         public static final byte EXTRA_FLAG = 1 << 6;
     }
 
-    public void setAsUpdateMessage(byte mergeDir, byte neighborDir, PositionWritable nodeToDelete, PositionWritable nodeToAdd) {
+    public void setAsUpdateMessage(byte mergeDir, byte neighborDir, PositionWritable nodeToDelete,
+            PositionWritable nodeToAdd) {
         byte neighborToMeDir = mirrorDirection(neighborDir);
         byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
 
@@ -53,12 +54,10 @@
         node.getListFromDir(neighborToMergeDir).append(nodeToAdd);
     }
 
-
-
     /*
      * Returns the edge dir for B->A when the A->B edge is type @dir
      */
-    public byte mirrorDirection(byte dir) {
+    public static byte mirrorDirection(byte dir) {
         switch (dir) {
             case MessageFlag.DIR_FF:
                 return MessageFlag.DIR_RR;
@@ -77,7 +76,7 @@
      * When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge, 
      * returns the new edge type for A->C
      */
-    public byte flipDirection(byte neighborDir, byte mergeDir) {
+    public static byte flipDirection(byte neighborDir, byte mergeDir) {
         switch (mergeDir) {
 
             case MessageFlag.DIR_FF:
@@ -118,22 +117,7 @@
             // remove position and merge its position lists with node
             if (!updateNode.equals(NodeWritable.EMPTY_NODE)) {
                 // need to remove updateNode from the specified PositionList
-                switch (updateFlag & MessageFlag.DIR_MASK) {
-                    case MessageFlag.DIR_FF:
-                        node.getFFList().remove(updateNode.getNodeID());
-                        break;
-                    case MessageFlag.DIR_FR:
-                        node.getFRList().remove(updateNode.getNodeID());
-                        break;
-                    case MessageFlag.DIR_RF:
-                        node.getRFList().remove(updateNode.getNodeID());
-                        break;
-                    case MessageFlag.DIR_RR:
-                        node.getRRList().remove(updateNode.getNodeID());
-                        break;
-                    default:
-                        throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
-                }
+                node.getListFromDir(updateFlag).remove(updateNode.getNodeID());
             }
             // now merge positionlists from update and node
             node.getFFList().appendList(updateNode.getFFList());
@@ -142,25 +126,40 @@
             node.getRRList().appendList(updateNode.getRRList());
         } else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
             // this message wants to merge node with updateNode.
-            // the direction flag indicates how the merge should take place.
-            // TODO send update or remove edge that I merge with
+            // the direction flag indicates node's relationship with updateNode
+            node.getListFromDir(updateFlag).remove(updateNode.getNodeID()); // remove the node from my edges
+            node.getKmer().mergeWithKmerInDir(updateFlag, kmerSize, updateNode.getKmer()); // merge with its kmer
+
+            // merge my edges with the incoming node's edges, accounting for if the node flipped in 
+            // the merge and if it's my predecessor or successor
             switch (updateFlag & MessageFlag.DIR_MASK) {
                 case MessageFlag.DIR_FF:
-                    node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+                    // node merging with me is FF to me
                     node.getFFList().set(updateNode.getFFList());
-                    // TODO not just FF list here-- FR as well
+                    node.getFRList().set(updateNode.getFRList());
+                    // update isn't allowed to have any other successors (mirror & flip)
+                    if (updateNode.getRFList().getCountOfPosition() > 0)
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 case MessageFlag.DIR_FR:
-                    // FIXME not sure if this should be reverse-complement or just reverse...
-                    node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
-                    node.getFRList().set(updateNode.getFRList());
+                    // flip edges
+                    node.getFFList().set(updateNode.getRFList());
+                    node.getFRList().set(updateNode.getRRList());
+                    if (updateNode.getFFList().getCountOfPosition() > 0)
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 case MessageFlag.DIR_RF:
-
+                    // flip edges
+                    node.getRFList().set(updateNode.getFFList());
+                    node.getRRList().set(updateNode.getFRList());
+                    if (updateNode.getRRList().getCountOfPosition() > 0)
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 case MessageFlag.DIR_RR:
-                    node.getKmer().mergeWithRRKmer(kmerSize, updateNode.getKmer());
+                    node.getRFList().set(updateNode.getRFList());
                     node.getRRList().set(updateNode.getRRList());
+                    if (updateNode.getFRList().getCountOfPosition() > 0)
+                        throw new IOException("Invalid merge detected!  Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
                     break;
                 default:
                     throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);