H4 driver works with multipleOutputs
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index c8cfe6d..69af73d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -327,6 +327,10 @@
}
}
}
+
+ public void close() throws IOException {
+ mos.close();
+ }
}
/*
@@ -360,10 +364,15 @@
PositionWritable.class, MessageWritableNodeWithFlag.class);
FileSystem dfs = FileSystem.get(conf);
- dfs.delete(outputPath, true); // clean output dir
+ // clean output dirs
+ dfs.delete(outputPath, true);
+ dfs.delete(new Path(toMergeOutput), true);
+ dfs.delete(new Path(completeOutput), true);
+ dfs.delete(new Path(updatesOutput), true);
+
RunningJob job = JobClient.runJob(conf);
- // move the tmp outputs to the arg-spec'ed dirs
+ // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(toMergeOutput))) {
dfs.mkdirs(new Path(toMergeOutput));
}
@@ -378,14 +387,13 @@
}
@Override
- public int run(String[] arg0) throws Exception {
- // TODO Auto-generated method stub
- return 0;
+ public int run(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
+ return res;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
- System.out.println("Ran the job fine!");
System.exit(res);
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index f8cabee..72be4b5 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.tools.ant.util.IdentityMapper;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -29,130 +30,131 @@
@SuppressWarnings("deprecation")
public class MergePathsH4Driver {
- private static final String TO_MERGE = "toMerge";
- private static final String COMPLETE = "complete";
- private static final String UPDATES = "updates";
- private String mergeOutput;
- private String completeOutput;
- private String updatesOutput;
+ private static final String TO_MERGE = "toMerge";
+ private static final String COMPLETE = "complete";
+ private static final String UPDATES = "updates";
+ private String mergeOutput;
+ private String completeOutput;
+ private String updatesOutput;
- private void setOutputPaths(String basePath, int mergeIteration) {
- basePath = basePath.replaceAll("/$", ""); // strip trailing slash
- mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
- completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
- updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
- }
+ private void setOutputPaths(String basePath, int mergeIteration) {
+ basePath = basePath.replaceAll("/$", ""); // strip trailing slash
+ mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
+ completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
+ updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
+ }
- private static class Options {
- @Option(name = "-inputpath", usage = "the input path", required = true)
- public String inputPath;
+ private static class Options {
+ @Option(name = "-inputpath", usage = "the input path", required = true)
+ public String inputPath;
- @Option(name = "-outputpath", usage = "the output path", required = true)
- public String outputPath;
+ @Option(name = "-outputpath", usage = "the output path", required = true)
+ public String outputPath;
- @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
- public String mergeResultPath;
+ @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
+ public String mergeResultPath;
- @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
- public int numReducers;
+ @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+ public int numReducers;
- @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
- public int sizeKmer;
+ @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+ public int sizeKmer;
- @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
- public int mergeRound;
+ @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
+ public int mergeRound;
- @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
- public String hadoopConf;
+ @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
+ public String hadoopConf;
- }
+ }
- /*
- * Main driver for path merging. Given a graph, this driver runs
- * PathNodeInitial to ID heads and tails, then does up to @mergeRound
- * iterations of path merging. Updates during the merge are batch-processed
- * at the end in a final update job.
- */
- public void run(String inputGraphPath, String outputGraphPath,
- int numReducers, int sizeKmer, int mergeRound,
- String defaultConfPath, JobConf defaultConf) throws IOException {
- JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
- if (defaultConfPath != null) {
- baseConf.addResource(new Path(defaultConfPath));
- }
- baseConf.setNumReduceTasks(numReducers);
- baseConf.setInt("sizeKmer", sizeKmer);
- FileSystem dfs = FileSystem.get(baseConf);
+ /*
+ * Main driver for path merging. Given a graph, this driver runs
+ * PathNodeInitial to ID heads and tails, then does up to @mergeRound
+ * iterations of path merging. Updates during the merge are batch-processed
+ * at the end in a final update job.
+ */
+ public void run(String inputGraphPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
+ String defaultConfPath, JobConf defaultConf) throws IOException {
+ JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
+ if (defaultConfPath != null) {
+ baseConf.addResource(new Path(defaultConfPath));
+ }
+ baseConf.setNumReduceTasks(numReducers);
+ baseConf.setInt("sizeKmer", sizeKmer);
+ FileSystem dfs = FileSystem.get(baseConf);
- int iMerge = 0;
+ int iMerge = 0;
- // identify head and tail nodes with pathnode initial
- PathNodeInitial inith4 = new PathNodeInitial();
- setOutputPaths(inputGraphPath, iMerge);
- String prevToMergeOutput = inputGraphPath;
- inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
+ // identify head and tail nodes with pathnode initial
+ PathNodeInitial inith4 = new PathNodeInitial();
+ setOutputPaths(inputGraphPath, iMerge);
+ String prevToMergeOutput = inputGraphPath;
+ System.out.println("initial run. toMerge: " + mergeOutput + ", complete: " + completeOutput);
+ inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
+ dfs.copyToLocalFile(new Path(mergeOutput), new Path("initial-toMerge"));
+ dfs.copyToLocalFile(new Path(completeOutput), new Path("initial-complete"));
- // several iterations of merging
- MergePathsH4 merger = new MergePathsH4();
- for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
- prevToMergeOutput = mergeOutput;
- setOutputPaths(inputGraphPath, iMerge);
- merger.run(prevToMergeOutput, mergeOutput, completeOutput,
- updatesOutput, baseConf);
- if (dfs.listStatus(new Path(mergeOutput)).length == 0) {
- // no output from previous run-- we are done!
- break;
- }
- }
+ // several iterations of merging
+ MergePathsH4 merger = new MergePathsH4();
+ for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
+ prevToMergeOutput = mergeOutput;
+ setOutputPaths(inputGraphPath, iMerge);
+ merger.run(prevToMergeOutput, mergeOutput, completeOutput, updatesOutput, baseConf);
+ dfs.copyToLocalFile(new Path(mergeOutput), new Path("i" + iMerge +"-toMerge"));
+ dfs.copyToLocalFile(new Path(completeOutput), new Path("i" + iMerge +"-complete"));
+ dfs.copyToLocalFile(new Path(updatesOutput), new Path("i" + iMerge +"-updates"));
+
+ if (dfs.listStatus(new Path(mergeOutput)) == null || dfs.listStatus(new Path(mergeOutput)).length == 0) {
+ // no output from previous run-- we are done!
+ break;
+ }
+ }
+
+ // finally, combine all the completed paths and update messages to
+ // create a single merged graph output
+ dfs.delete(new Path(outputGraphPath), true); // clear any previous
+ // output
+ // use all the "complete" and "update" outputs in addition to the final
+ // (possibly empty) toMerge directories
+ // as input to the final update step. This builds a comma-delim'ed
+ // String of said files.
+ final String lastMergeOutput = mergeOutput;
+ PathFilter updateFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path arg0) {
+ String path = arg0.toString();
+ System.out.println("equals last: " + path + " vs " + lastMergeOutput + " = " + path.endsWith(lastMergeOutput));
+ return (path.matches(".*" + COMPLETE + "_i\\d+$") || path.matches(".*" + UPDATES + "_i\\d+$") || path.endsWith(lastMergeOutput));
+ }
+ };
+ StringBuilder sb = new StringBuilder();
+ String delim = "";
+ for (FileStatus file : dfs.globStatus(new Path(inputGraphPath.replaceAll("/$", "") + "*"), updateFilter)) {
+ sb.append(delim).append(file.getPath());
+ delim = ",";
+ }
+ String finalInputs = sb.toString();
+ System.out.println("This is the final sacrifice: " + finalInputs);
+ // TODO run the update iteration
+ }
- // finally, combine all the completed paths and update messages to
- // create a single merged graph output
- dfs.delete(new Path(outputGraphPath), true); // clear any previous
- // output
- // use all the "complete" and "update" outputs in addition to the final
- // (possibly empty) toMerge directories
- // as input to the final update step. This builds a comma-delim'ed
- // String of said files.
- final String lastMergeOutput = mergeOutput;
- PathFilter updateFilter = new PathFilter() {
- @Override
- public boolean accept(Path arg0) {
- String path = arg0.toString();
- return path.contains(COMPLETE + "_i")
- || path.contains(UPDATES + "_i")
- || path.equals(lastMergeOutput);
- }
- };
- StringBuilder sb = new StringBuilder();
- String delim = "";
- for (FileStatus file : dfs.globStatus(new Path("*"), updateFilter)) {
- sb.append(delim).append(file);
- delim = ",";
- }
- String finalInputs = sb.toString();
- // TODO run the update iteration
- }
+ public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
+ String defaultConfPath) throws IOException {
+ run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
+ }
- public void run(String inputPath, String outputGraphPath, int numReducers,
- int sizeKmer, int mergeRound, String defaultConfPath)
- throws IOException {
- run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound,
- defaultConfPath, null);
- }
+ public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
+ JobConf defaultConf) throws IOException {
+ run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
+ }
- public void run(String inputPath, String outputGraphPath, int numReducers,
- int sizeKmer, int mergeRound, JobConf defaultConf)
- throws IOException {
- run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound,
- null, defaultConf);
- }
-
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
- MergePathsH4Driver driver = new MergePathsH4Driver();
- driver.run(options.inputPath, options.outputPath, options.numReducers,
- options.sizeKmer, options.mergeRound, null, null);
- }
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ MergePathsH4Driver driver = new MergePathsH4Driver();
+ driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, options.mergeRound,
+ null, null);
+ }
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 667c4b1..45a2d0a 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -24,6 +25,8 @@
protected final String GRAPHBUILD = "/01-graphbuild/";
protected final String MERGED = "/02-pathmerge/";
+ protected final String ACTUAL = "src/test/resources/actual/";
+
protected final boolean regenerateGraph = true;
{
@@ -37,22 +40,27 @@
@Test
public void TestMergeOneIteration() throws Exception {
cleanUpOutput();
- if (regenerateGraph) {
- copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
- buildGraph();
- copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
- } else {
- copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
- }
+ prepareGraph();
MergePathsH4Driver h4 = new MergePathsH4Driver();
- h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 1, conf);
+ h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 5, conf);
copyResultsToLocal(MERGED, ACTUAL_ROOT + MERGED, false, conf);
}
+// @Test
+ public void testPathNode() throws IOException {
+ cleanUpOutput();
+ prepareGraph();
+
+ // identify head and tail nodes with pathnode initial
+ PathNodeInitial inith4 = new PathNodeInitial();
+ inith4.run(GRAPHBUILD, "/toMerge", "/completed", conf);
+ }
+
+
- public void buildGraph() throws Exception {
+ public void buildGraph() throws IOException {
JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
FileInputFormat.setInputPaths(buildConf, SEQUENCE);
FileOutputFormat.setOutputPath(buildConf, new Path(GRAPHBUILD));
@@ -63,4 +71,14 @@
boolean resultsAreText = false;
copyResultsToLocal(GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD, resultsAreText, buildConf);
}
+
+ private void prepareGraph() throws IOException {
+ if (regenerateGraph) {
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
+ buildGraph();
+ copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+ } else {
+ copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+ }
+ }
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
index 2cd3b53..9a113f1 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -94,6 +94,9 @@
FileSystem lfs = FileSystem.getLocal(new Configuration());
lfs.mkdirs(new Path(localDestFile).getParent());
File filePathTo = new File(localDestFile);
+ if (filePathTo.exists() && filePathTo.isDirectory()) {
+ filePathTo = new File(localDestFile + "/data");
+ }
BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, validFile.getPath(), conf);
SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile