PathNodeInitial uses 2 output bins instead of 3
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 3c46dc7..c1d8a71 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -34,8 +34,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -57,7 +57,6 @@
public static final String COMPLETE_OUTPUT = "complete";
public static final String TO_MERGE_OUTPUT = "toMerge";
- public static final String TO_UPDATE_OUTPUT = "toUpdate";
private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
@@ -89,7 +88,6 @@
Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
- private PositionWritable outputKey;
private NodeWithFlagWritable outputValue;
private int inDegree;
private int outDegree;
@@ -98,7 +96,6 @@
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
- outputKey = new PositionWritable();
}
/*
@@ -159,7 +156,6 @@
Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
- private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
private int KMER_SIZE;
private NodeWithFlagWritable inputValue;
@@ -192,7 +188,6 @@
OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
throws IOException {
completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
- toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
outputFlag = MessageFlag.EMPTY_MESSAGE;
sawSelf = false;
@@ -211,35 +206,34 @@
nodeToKeep.set(inputValue.getNode());
}
}
+ if (!sawSelf) {
+ throw new IOException("Didn't see a self node in PathNodeInitial! flag: " + outputFlag);
+ }
- if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
- if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
- // non-path or single path nodes
- if ((outputFlag & NEAR_PATH) > 0) {
- // non-path, but an update candidate
- outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
- toUpdateCollector.collect(key, outputValue);
- } else {
- // non-path or single-node path. Store in "complete" output
- outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
- completeCollector.collect(key, outputValue);
- }
- } else {
- // path nodes that are mergeable
- outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
- outputValue.set(outputFlag, nodeToKeep);
+ if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
+ // non-path or single path nodes
+ if ((outputFlag & NEAR_PATH) > 0) {
+ // non-path, but an update candidate
+ outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
toMergeCollector.collect(key, outputValue);
-
- reporter.incrCounter("genomix", "path_nodes", 1);
- if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
- }
- if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
- }
+ } else {
+ // non-path or single-node path. Store in "complete" output
+ outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+ completeCollector.collect(key, outputValue);
}
} else {
- throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
+ // path nodes that are mergeable
+ outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
+ outputValue.set(outputFlag, nodeToKeep);
+ toMergeCollector.collect(key, outputValue);
+
+ reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ }
}
}
@@ -251,41 +245,30 @@
/*
* Mark the head, tail, and simple path nodes in one map-reduce job.
*/
- public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
- JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(PathNodeInitial.class);
conf.setJobName("PathNodeInitial " + inputPath);
-
- FileInputFormat.addInputPaths(conf, inputPath);
- // Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
- Path outputPath = new Path(toMergeOutput);
- FileOutputFormat.setOutputPath(conf, outputPath);
-
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(NullOutputFormat.class);
-
conf.setMapOutputKeyClass(PositionWritable.class);
conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
conf.setOutputValueClass(NodeWithFlagWritable.class);
-
- conf.setMapperClass(PathNodeInitialMapper.class);
- conf.setReducerClass(PathNodeInitialReducer.class);
-
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileInputFormat.addInputPaths(conf, inputPath);
+ FileOutputFormat.setOutputPath(conf, new Path(toMergeOutput));
MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
PositionWritable.class, NodeWithFlagWritable.class);
- MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
-
FileSystem dfs = FileSystem.get(conf);
- dfs.delete(outputPath, true); // clean output dir
+ dfs.delete(new Path(toMergeOutput), true); // clean output dir
+
+ conf.setMapperClass(PathNodeInitialMapper.class);
+ conf.setReducerClass(PathNodeInitialReducer.class);
RunningJob job = JobClient.runJob(conf);
// move the tmp outputs to the arg-spec'ed dirs
- dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
- dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
- // dfs.rename(outputPath, new Path(toMergeOutput));
+ dfs.rename(new Path(toMergeOutput + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
return job;
}