Merge branch 'nanzhang/genomix' into genomix/fullstack_genomix
Conflicts:
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
index 1a758c7..954abbe 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -43,7 +43,7 @@
}
for(int i = 0; i < listPosZeroInRead.getCountOfPosition(); i++) {
VertexID.set(listPosZeroInRead.getPosition(i));
- outputListAndKmer.set(listPosNonZeroInRead, key);
+ outputListAndKmer.set(listPosNonZeroInRead, key);//you suo bianhua1. -1 2. qudiao tongyihangde
output.collect(VertexID, outputListAndKmer);
}
for(int i = 0; i < listPosNonZeroInRead.getCountOfPosition(); i++) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
index 1da7d83..40889b2 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -9,7 +9,6 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritableFactory;
import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
@@ -20,6 +19,33 @@
@SuppressWarnings("deprecation")
public class DeepGraphBuildingReducer extends MapReduceBase implements
Reducer<PositionWritable, PositionListAndKmerWritable, NodeWritable, NullWritable> {
+ public class nodeToMergeState {
+ public static final byte NOT_UPDATE = 0;
+ public static final byte ASSIGNED_BY_RIGHTNODE = 1;
+ private byte state;
+
+ public nodeToMergeState() {
+ state = NOT_UPDATE;
+ }
+
+ public void setToNotUpdate() {
+ state = NOT_UPDATE;
+ }
+
+ public void setToAssignedByRightNode() {
+ state = ASSIGNED_BY_RIGHTNODE;
+ }
+
+ public String getState() {
+ switch (state) {
+ case NOT_UPDATE:
+ return "NOT_UPDATE";
+ case ASSIGNED_BY_RIGHTNODE:
+ return "ASSIGNED_BY_RIGHTNODE";
+ }
+ return "ERROR_STATE";
+ }
+ }
private PositionListAndKmerWritable nodeListAndKmer = new PositionListAndKmerWritable();
private PositionListAndKmerWritable nodeSuccListAndKmer = new PositionListAndKmerWritable();
@@ -29,122 +55,88 @@
private PositionListWritable incomingList = new PositionListWritable();
private PositionListWritable outgoingList = new PositionListWritable();
private NullWritable nullWritable = NullWritable.get();
+ private nodeToMergeState state = new nodeToMergeState();
private int KMER_SIZE;
-
public void configure(JobConf job) {
KMER_SIZE = job.getInt("sizeKmer", 0);
}
- public enum nodeToMergeState {
- SRC_UPDATE_FROM_VALUES,
- SRC_NODE_NON_UPDATE,
- SRC_ASSIGNED_BY_RIGHTNODE;
- }
-
@Override
public void reduce(PositionWritable key, Iterator<PositionListAndKmerWritable> values,
OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
- //initialize the Start point in Read
int readID = key.getReadID();
byte posInRead = 0;
+ assembleNodeInitialization(readID, posInRead, values);
+ posInRead = (byte) (posInRead + 2);
+ //----LOOP
+ while (values.hasNext()) {
+ assembleNodeFromValues(readID, posInRead, values);
+ posInRead = (byte) (posInRead + 1);
+ if (nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true) {
+ nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
+ state.setToNotUpdate();
+ }
+ else {
+ state.setToAssignedByRightNode();
+ output.collect(nodeToMerge, nullWritable);
+ }
+ }
+ }
+
+ public void assembleNodeFromValues(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+ throws IOException {
if (values.hasNext()) {
nodeListAndKmer.set(values.next());
- incomingList.set(nodeSuccListAndKmer.getVertexIDList());
+ } else {
+ throw new IOException("the size of values emerge bug!");
+ }
+ if (state.getState().equals("ASSIGNED_BY_RIGHTNODE")) {
+ nodeToMerge.set(nodeToBeMerged);
+ }
+ incomingList.reset();
+ incomingList.append(readID, (byte) (posInRead - 1));
+ if (nodeSuccListAndKmer.getVertexIDList() != null)
+ outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+ outgoingList.append(readID, (byte) (posInRead + 1));
+ nodeToBeMerged.setNodeID(readID, (byte) posInRead);
+ nodeToBeMerged.setIncomingList(incomingList);
+ nodeToBeMerged.setOutgoingList(outgoingList);
+ nodeToBeMerged.setKmer(nodeListAndKmer.getKmer());
+ }
+
+ public void assembleNodeInitialization(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+ throws IOException {
+ if (values.hasNext()) {
+ nodeListAndKmer.set(values.next());
+ } else {
+ throw new IOException("the size of values emerge bug!");
}
if (values.hasNext()) {
nodeSuccListAndKmer.set(values.next());
- if (nodeSuccListAndKmer.getVertexIDList() != null)
- outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
}
+ incomingList.reset();
+ incomingList.set(nodeSuccListAndKmer.getVertexIDList());
+ outgoingList.reset();
+ if (nodeSuccListAndKmer.getVertexIDList() != null)
+ outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
outgoingList.append(readID, (byte) (posInRead + 1));
startNodeInRead.setNodeID(readID, posInRead);
startNodeInRead.setIncomingList(incomingList);
startNodeInRead.setOutgoingList(outgoingList);
startNodeInRead.setKmer(nodeListAndKmer.getKmer());
- output.collect(startNodeInRead, nullWritable);
- posInRead++;
- //----initialize the nodeToMerge
+ //---------
nodeListAndKmer.set(nodeSuccListAndKmer);
incomingList.reset();
- incomingList.append(key.getReadID(), key.getPosInRead());
+ incomingList.append(readID, posInRead);
if (values.hasNext()) {
nodeSuccListAndKmer.set(values.next());
if (nodeSuccListAndKmer.getVertexIDList() != null)
outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
}
- outgoingList.append(readID, (byte) (posInRead + 1));
+ outgoingList.append(readID, (byte) (posInRead + 2));
nodeToMerge.setNodeID(readID, (byte) posInRead);
nodeToMerge.setIncomingList(incomingList);
nodeToMerge.setOutgoingList(outgoingList);
nodeToMerge.setKmer(nodeListAndKmer.getKmer());
- posInRead++;
- //----LOOP
- nodeToMergeState srcState = nodeToMergeState.SRC_NODE_NON_UPDATE;
- boolean srcOrTarget = true;
- while (values.hasNext()) {
- switch (srcState.toString()) {
- case "SRC_UPDATE_FROM_VALUES":
- srcOrTarget = true;
- getNodeFromValues(readID, posInRead, values, srcOrTarget);
- posInRead++;
- srcOrTarget = false;
- getNodeFromValues(readID, posInRead, values, srcOrTarget);
- posInRead++;
- break;
- case "SRC_NODE_NON_UPDATE":
- srcOrTarget = false;
- getNodeFromValues(readID, posInRead, values, srcOrTarget);
- posInRead++;
- break;
- case "SRC_ASSIGNED_BY_RIGHTNODE":
- nodeToMerge.set(nodeToBeMerged);
- srcOrTarget = false;
- getNodeFromValues(readID, posInRead, values, srcOrTarget);
- posInRead++;
- break;
- }
- if(nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true){
- nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
- srcState = nodeToMergeState.SRC_NODE_NON_UPDATE;
- }
- else{
- if(nodeToMerge.isPathNode() == false && nodeToBeMerged.isPathNode() == true){
-
- srcState = nodeToMergeState.SRC_ASSIGNED_BY_RIGHTNODE;
- output.collect(nodeToBeMerged, nullWritable);
- }
- else {
- srcState = nodeToMergeState.SRC_UPDATE_FROM_VALUES;
- output.collect(nodeToMerge, nullWritable);
- output.collect(nodeToBeMerged, nullWritable);
- }
- }
-
- }
- }
-
- public void getNodeFromValues(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values,
- boolean srcOrTarget) {
- if (values.hasNext()) {
- nodeListAndKmer.set(values.next());
- incomingList.reset();
- incomingList.append(readID, (byte) (posInRead - 1));
- }
- if (values.hasNext()) {
- nodeSuccListAndKmer.set(values.next());
- if (nodeSuccListAndKmer.getVertexIDList() != null)
- outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
- }
- outgoingList.append(readID, (byte) (posInRead + 1));
- if (srcOrTarget == true) {
- nodeToMerge.setNodeID(readID, (byte) posInRead);
- nodeToMerge.setIncomingList(incomingList);
- nodeToMerge.setOutgoingList(outgoingList);
- nodeToMerge.setKmer(nodeListAndKmer.getKmer());
- } else {
- nodeToBeMerged.setNodeID(readID, (byte) posInRead);
- nodeToBeMerged.setIncomingList(incomingList);
- nodeToBeMerged.setOutgoingList(outgoingList);
- nodeToBeMerged.setKmer(nodeListAndKmer.getKmer());
- }
+ state.setToNotUpdate();
}
}