non-path nodes are pushed to "complete" output
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index 38dde9d..c2b0e52 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -40,6 +40,7 @@
public static final byte IS_HEAD = 1 << 3;
public static final byte IS_TAIL = 1 << 4;
public static final byte IS_PSEUDOHEAD = 1 << 5;
+ public static final byte IS_COMPLETE = 1 << 6;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index c8386ea..5fbf33f 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -126,27 +127,34 @@
output.collect(outputKey, outputValue);
}
}
+ // push this non-path node to the "complete" output
+ outputValue.set((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), key);
+ output.collect(key.getNodeID(), outputValue);
}
}
}
public static class PathNodeInitialReducer extends MapReduceBase implements
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
+ private MultipleOutputs mos;
+ private static final String COMPLETE_OUTPUT = "complete";
private int KMER_SIZE;
private MessageWritableNodeWithFlag inputValue;
private MessageWritableNodeWithFlag outputValue;
private NodeWritable nodeToKeep;
private int count;
private byte flag;
+ private boolean isComplete;
public void configure(JobConf conf) {
+ mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
nodeToKeep = new NodeWritable(KMER_SIZE);
}
+ @SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
@@ -155,19 +163,28 @@
inputValue.set(values.next());
if (!values.hasNext()) {
if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
- // FROM_SELF => need to keep this PATH node
- output.collect(key, inputValue);
+ if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
+ // non-path node. Store in "complete" output
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ } else {
+ // FROM_SELF => need to keep this PATH node
+ output.collect(key, inputValue);
+ }
}
} else {
// multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node
count = 0;
flag = MessageFlag.EMPTY_MESSAGE;
+ isComplete = false;
while (true) { // process values; break when no more
count++;
if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
// SELF -> keep this node
flag |= MessageFlag.FROM_SELF;
nodeToKeep.set(inputValue.getNode());
+ if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
+ isComplete = true;
+ }
} else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
flag |= MessageFlag.IS_TAIL;
} else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
@@ -176,7 +193,7 @@
if (!values.hasNext()) {
break;
} else {
- inputValue = values.next();
+ inputValue.set(values.next());
}
}
if (count < 2) {
@@ -184,20 +201,24 @@
+ String.valueOf(count));
}
if ((flag & MessageFlag.FROM_SELF) > 0) {
- // only keep simple path nodes
- outputValue.set(flag, nodeToKeep);
- output.collect(key, outputValue);
-
- reporter.incrCounter("genomix", "path_nodes", 1);
- if ((flag & MessageFlag.IS_HEAD) > 0) {
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
- }
- if ((flag & MessageFlag.IS_TAIL) > 0) {
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ if ((flag & MessageFlag.IS_COMPLETE) > 0) {
+ // non-path node. Store in "complete" output
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ } else {
+ // only keep simple path nodes
+ outputValue.set(flag, nodeToKeep);
+ output.collect(key, outputValue);
+
+ reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((flag & MessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((flag & MessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ }
}
} else {
- // this is a non-path node.
- // TODO: keep this node in a "completed" reducer
+ throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + flag);
}
}
}