mergepaths uses multipleoutputs properly (prep for update code)
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index b240833..926afe1 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
@@ -21,10 +22,15 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
@@ -213,8 +219,12 @@
private static class MergePathsH4Reducer extends MapReduceBase implements
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
private MultipleOutputs mos;
- public static final String COMPLETE_OUTPUT = "complete";
- public static final String UPDATES_OUTPUT = "update";
+ private static final String TO_MERGE_OUTPUT = "toMerge";
+ private static final String COMPLETE_OUTPUT = "complete";
+ private static final String UPDATES_OUTPUT = "update";
+ private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
+ private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
+ private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> updatesCollector;
private int KMER_SIZE;
private MessageWritableNodeWithFlag inputValue;
@@ -243,16 +253,19 @@
public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
throws IOException {
+ toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+ completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+ updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
inputValue.set(values.next());
if (!values.hasNext()) {
if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
// complete path (H & T meet in this node)
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ completeCollector.collect(key, inputValue);
} else {
// FROM_SELF => no merging this round. remap self
- output.collect(key, inputValue);
+ toMergeCollector.collect(key, inputValue);
}
} else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
// FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
@@ -308,9 +321,9 @@
outputValue.set(outFlag, curNode);
if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// True heads meeting tails => merge is complete for this node
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
+ completeCollector.collect(key, outputValue);
} else {
- output.collect(key, outputValue);
+ toMergeCollector.collect(key, outputValue);
}
}
}
@@ -319,16 +332,17 @@
/*
* Run one iteration of the mergePaths algorithm
*/
- public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput, JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(MergePathsH4.class);
conf.setJobName("MergePathsH4 " + inputPath);
- FileInputFormat.addInputPath(conf, new Path(inputPath));
- FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ FileInputFormat.addInputPaths(conf, inputPath);
+ Path outputPath = new Path(inputPath + ".h4merge.tmp");
+ FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputFormat(NullOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
@@ -337,10 +351,24 @@
conf.setMapperClass(MergePathsH4Mapper.class);
conf.setReducerClass(MergePathsH4Reducer.class);
-
- FileSystem.get(conf).delete(new Path(outputPath), true);
-
- return JobClient.runJob(conf);
+
+ MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
+ MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
+ MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
+
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(outputPath, true); // clean output dir
+ RunningJob job = JobClient.runJob(conf);
+
+ // move the tmp outputs to the arg-spec'ed dirs
+ dfs.rename(new Path(outputPath + File.pathSeparator + MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(toMergeOutput));
+ dfs.rename(new Path(outputPath + File.pathSeparator + MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(completeOutput));
+ dfs.rename(new Path(outputPath + File.pathSeparator + MergePathsH4Reducer.UPDATES_OUTPUT), new Path(updatesOutput));
+
+ return job;
}
@Override
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 497e926..2974d4c 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.genomix.hadoop.pmcommon;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@@ -33,8 +34,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -48,6 +49,9 @@
*/
@SuppressWarnings("deprecation")
public class PathNodeInitial extends Configured implements Tool {
+
+ public static final String COMPLETE_OUTPUT = "complete";
+ public static final String TO_MERGE_OUTPUT = "toMerge";
public static class PathNodeInitialMapper extends MapReduceBase implements
Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
@@ -71,6 +75,7 @@
public void map(NodeWritable key, NullWritable value,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
throws IOException {
+
inDegree = key.inDegree();
outDegree = key.outDegree();
if (inDegree == 1 && outDegree == 1) {
@@ -132,46 +137,48 @@
public static class PathNodeInitialReducer extends MapReduceBase implements
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
- private MultipleOutputs mos;
- private static final String COMPLETE_OUTPUT = "complete";
- private int KMER_SIZE;
+ protected MultipleOutputs mos;
+ protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
+ protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
+ protected int KMER_SIZE;
+
private MessageWritableNodeWithFlag inputValue;
private MessageWritableNodeWithFlag outputValue;
private NodeWritable nodeToKeep;
private int count;
private byte flag;
- private boolean isComplete;
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
- nodeToKeep = new NodeWritable(KMER_SIZE);
+ nodeToKeep = new NodeWritable(KMER_SIZE);
}
- @SuppressWarnings("unchecked")
- @Override
+ @SuppressWarnings("unchecked")
+ @Override
public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
throws IOException {
-
+ completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+ toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+
inputValue.set(values.next());
if (!values.hasNext()) {
if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
// non-path node. Store in "complete" output
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ completeCollector.collect(key, inputValue);
} else {
// FROM_SELF => need to keep this PATH node
- output.collect(key, inputValue);
+ toMergeCollector.collect(key, inputValue);
}
}
} else {
// multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node
count = 0;
flag = MessageFlag.EMPTY_MESSAGE;
- isComplete = false;
while (true) { // process values; break when no more
count++;
if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
@@ -179,7 +186,7 @@
flag |= MessageFlag.FROM_SELF;
nodeToKeep.set(inputValue.getNode());
if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
- isComplete = true;
+ flag |= MessageFlag.IS_COMPLETE;
}
} else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
flag |= MessageFlag.IS_TAIL;
@@ -199,11 +206,11 @@
if ((flag & MessageFlag.FROM_SELF) > 0) {
if ((flag & MessageFlag.IS_COMPLETE) > 0) {
// non-path node. Store in "complete" output
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ completeCollector.collect(key, inputValue);
} else {
// only keep simple path nodes
outputValue.set(flag, nodeToKeep);
- output.collect(key, outputValue);
+ toMergeCollector.collect(key, outputValue);
reporter.incrCounter("genomix", "path_nodes", 1);
if ((flag & MessageFlag.IS_HEAD) > 0) {
@@ -218,21 +225,26 @@
}
}
}
+
+ public void close() throws IOException {
+ mos.close();
+ }
}
/*
* Mark the head, tail, and simple path nodes in one map-reduce job.
*/
- public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(PathNodeInitial.class);
conf.setJobName("PathNodeInitial " + inputPath);
- FileInputFormat.addInputPath(conf, new Path(inputPath));
- FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ FileInputFormat.addInputPaths(conf, inputPath);
+ Path outputPath = new Path(inputPath + ".initialMerge.tmp");
+ FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputFormat(NullOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
@@ -241,16 +253,27 @@
conf.setMapperClass(PathNodeInitialMapper.class);
conf.setReducerClass(PathNodeInitialReducer.class);
+
+ MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
+ MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
- FileSystem.get(conf).delete(new Path(outputPath), true);
-
- return JobClient.runJob(conf);
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(outputPath, true); // clean output dir
+ RunningJob job = JobClient.runJob(conf);
+
+ // move the tmp outputs to the arg-spec'ed dirs
+ dfs.rename(new Path(outputPath + File.pathSeparator + TO_MERGE_OUTPUT), new Path(toMergeOutput));
+ dfs.rename(new Path(outputPath + File.pathSeparator + COMPLETE_OUTPUT), new Path(completeOutput));
+
+ return job;
}
@Override
- public int run(String[] arg0) throws Exception {
- // TODO Auto-generated method stub
- return 0;
+ public int run(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+ return res;
}
public static void main(String[] args) throws Exception {