Merge branch 'nanzhang/genomix' into genomix/fullstack_genomix

Conflicts:
	genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
index 1a758c7..954abbe 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -43,7 +43,7 @@
         }
         for(int i = 0; i < listPosZeroInRead.getCountOfPosition(); i++) {
             VertexID.set(listPosZeroInRead.getPosition(i));
-            outputListAndKmer.set(listPosNonZeroInRead, key);
+            outputListAndKmer.set(listPosNonZeroInRead, key);//you suo bianhua1. -1 2. qudiao tongyihangde 
             output.collect(VertexID, outputListAndKmer);
         }
         for(int i = 0; i < listPosNonZeroInRead.getCountOfPosition(); i++) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
index 1da7d83..40889b2 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -9,7 +9,6 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
 import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
 import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritableFactory;
 import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
@@ -20,6 +19,33 @@
 @SuppressWarnings("deprecation")
 public class DeepGraphBuildingReducer extends MapReduceBase implements
         Reducer<PositionWritable, PositionListAndKmerWritable, NodeWritable, NullWritable> {
+    public class nodeToMergeState {
+        public static final byte NOT_UPDATE = 0;
+        public static final byte ASSIGNED_BY_RIGHTNODE = 1;
+        private byte state;
+
+        public nodeToMergeState() {
+            state = NOT_UPDATE;
+        }
+
+        public void setToNotUpdate() {
+            state = NOT_UPDATE;
+        }
+
+        public void setToAssignedByRightNode() {
+            state = ASSIGNED_BY_RIGHTNODE;
+        }
+
+        public String getState() {
+            switch (state) {
+                case NOT_UPDATE:
+                    return "NOT_UPDATE";
+                case ASSIGNED_BY_RIGHTNODE:
+                    return "ASSIGNED_BY_RIGHTNODE";
+            }
+            return "ERROR_STATE";
+        }
+    }
 
     private PositionListAndKmerWritable nodeListAndKmer = new PositionListAndKmerWritable();
     private PositionListAndKmerWritable nodeSuccListAndKmer = new PositionListAndKmerWritable();
@@ -29,122 +55,88 @@
     private PositionListWritable incomingList = new PositionListWritable();
     private PositionListWritable outgoingList = new PositionListWritable();
     private NullWritable nullWritable = NullWritable.get();
+    private nodeToMergeState state = new nodeToMergeState();
     private int KMER_SIZE;
-    
     public void configure(JobConf job) {
         KMER_SIZE = job.getInt("sizeKmer", 0);
     }
-    public enum nodeToMergeState {
-        SRC_UPDATE_FROM_VALUES,
-        SRC_NODE_NON_UPDATE,
-        SRC_ASSIGNED_BY_RIGHTNODE;
-    }
-
     @Override
     public void reduce(PositionWritable key, Iterator<PositionListAndKmerWritable> values,
             OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
-        //initialize the Start point in Read
         int readID = key.getReadID();
         byte posInRead = 0;
+        assembleNodeInitialization(readID, posInRead, values);
+        posInRead = (byte) (posInRead + 2);
+        //----LOOP
+        while (values.hasNext()) {
+            assembleNodeFromValues(readID, posInRead, values);
+            posInRead = (byte) (posInRead + 1);
+            if (nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true) {
+                nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
+                state.setToNotUpdate();
+            }
+            else {
+                state.setToAssignedByRightNode();
+                output.collect(nodeToMerge, nullWritable);
+            }
+        }
+    }
+
+    public void assembleNodeFromValues(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+            throws IOException {
         if (values.hasNext()) {
             nodeListAndKmer.set(values.next());
-            incomingList.set(nodeSuccListAndKmer.getVertexIDList());
+        } else {
+            throw new IOException("the size of values emerge bug!");
+        }
+        if (state.getState().equals("ASSIGNED_BY_RIGHTNODE")) {
+            nodeToMerge.set(nodeToBeMerged);
+        }
+        incomingList.reset();
+        incomingList.append(readID, (byte) (posInRead - 1));
+        if (nodeSuccListAndKmer.getVertexIDList() != null)
+            outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+        outgoingList.append(readID, (byte) (posInRead + 1));
+        nodeToBeMerged.setNodeID(readID, (byte) posInRead);
+        nodeToBeMerged.setIncomingList(incomingList);
+        nodeToBeMerged.setOutgoingList(outgoingList);
+        nodeToBeMerged.setKmer(nodeListAndKmer.getKmer());
+    }
+
+    public void assembleNodeInitialization(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+            throws IOException {
+        if (values.hasNext()) {
+            nodeListAndKmer.set(values.next());
+        } else {
+            throw new IOException("the size of values emerge bug!");
         }
         if (values.hasNext()) {
             nodeSuccListAndKmer.set(values.next());
-            if (nodeSuccListAndKmer.getVertexIDList() != null)
-                outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
         }
+        incomingList.reset();
+        incomingList.set(nodeSuccListAndKmer.getVertexIDList());
+        outgoingList.reset();
+        if (nodeSuccListAndKmer.getVertexIDList() != null)
+            outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
         outgoingList.append(readID, (byte) (posInRead + 1));
         startNodeInRead.setNodeID(readID, posInRead);
         startNodeInRead.setIncomingList(incomingList);
         startNodeInRead.setOutgoingList(outgoingList);
         startNodeInRead.setKmer(nodeListAndKmer.getKmer());
-        output.collect(startNodeInRead, nullWritable);
-        posInRead++;
-        //----initialize the nodeToMerge
+        //---------
         nodeListAndKmer.set(nodeSuccListAndKmer);
         incomingList.reset();
-        incomingList.append(key.getReadID(), key.getPosInRead());
+        incomingList.append(readID, posInRead);
         if (values.hasNext()) {
             nodeSuccListAndKmer.set(values.next());
             if (nodeSuccListAndKmer.getVertexIDList() != null)
                 outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
         }
-        outgoingList.append(readID, (byte) (posInRead + 1));
+        outgoingList.append(readID, (byte) (posInRead + 2));
         nodeToMerge.setNodeID(readID, (byte) posInRead);
         nodeToMerge.setIncomingList(incomingList);
         nodeToMerge.setOutgoingList(outgoingList);
         nodeToMerge.setKmer(nodeListAndKmer.getKmer());
-        posInRead++;
-        //----LOOP
-        nodeToMergeState srcState = nodeToMergeState.SRC_NODE_NON_UPDATE;
-        boolean srcOrTarget = true;
-        while (values.hasNext()) {
-            switch (srcState.toString()) {
-                case "SRC_UPDATE_FROM_VALUES":
-                    srcOrTarget = true;
-                    getNodeFromValues(readID, posInRead, values, srcOrTarget);
-                    posInRead++;
-                    srcOrTarget = false;
-                    getNodeFromValues(readID, posInRead, values, srcOrTarget);
-                    posInRead++;
-                    break;
-                case "SRC_NODE_NON_UPDATE":
-                    srcOrTarget = false;
-                    getNodeFromValues(readID, posInRead, values, srcOrTarget);
-                    posInRead++;
-                    break;
-                case "SRC_ASSIGNED_BY_RIGHTNODE":
-                    nodeToMerge.set(nodeToBeMerged);
-                    srcOrTarget = false;
-                    getNodeFromValues(readID, posInRead, values, srcOrTarget);
-                    posInRead++;
-                    break;
-            }
-            if(nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true){
-                nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
-                srcState = nodeToMergeState.SRC_NODE_NON_UPDATE;
-            }
-            else{
-                if(nodeToMerge.isPathNode() == false && nodeToBeMerged.isPathNode() == true){
-                    
-                    srcState = nodeToMergeState.SRC_ASSIGNED_BY_RIGHTNODE;
-                    output.collect(nodeToBeMerged, nullWritable);
-                }
-                else {
-                    srcState = nodeToMergeState.SRC_UPDATE_FROM_VALUES;
-                    output.collect(nodeToMerge, nullWritable);
-                    output.collect(nodeToBeMerged, nullWritable);
-                }
-            }
-
-        }
-    }
-
-    public void getNodeFromValues(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values,
-            boolean srcOrTarget) {
-        if (values.hasNext()) {
-            nodeListAndKmer.set(values.next());
-            incomingList.reset();
-            incomingList.append(readID, (byte) (posInRead - 1));
-        }
-        if (values.hasNext()) {
-            nodeSuccListAndKmer.set(values.next());
-            if (nodeSuccListAndKmer.getVertexIDList() != null)
-                outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
-        }
-        outgoingList.append(readID, (byte) (posInRead + 1));
-        if (srcOrTarget == true) {
-            nodeToMerge.setNodeID(readID, (byte) posInRead);
-            nodeToMerge.setIncomingList(incomingList);
-            nodeToMerge.setOutgoingList(outgoingList);
-            nodeToMerge.setKmer(nodeListAndKmer.getKmer());
-        } else {
-            nodeToBeMerged.setNodeID(readID, (byte) posInRead);
-            nodeToBeMerged.setIncomingList(incomingList);
-            nodeToBeMerged.setOutgoingList(outgoingList);
-            nodeToBeMerged.setKmer(nodeListAndKmer.getKmer());
-        }
+        state.setToNotUpdate();
     }
 }