Fix H4, test templates
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 5f72906..abc8ff1 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -173,7 +173,7 @@
/*
* remove the first instance of @toRemove. Uses a linear scan. Throws an exception if not in this list.
*/
- public void remove(PositionWritable toRemove) {
+ public void remove(PositionWritable toRemove, boolean ignoreMissing) {
Iterator<PositionWritable> posIterator = this.iterator();
while (posIterator.hasNext()) {
if(toRemove.equals(posIterator.next())) {
@@ -181,7 +181,13 @@
return;
}
}
- //throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+ if (!ignoreMissing) {
+ throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+ }
+ }
+
+ public void remove(PositionWritable toRemove) {
+ remove(toRemove, false);
}
@Override
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 21e8251..595acf1 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -190,11 +190,11 @@
if (curHead) {
if (mergeableNext && !nextHead) {
// merge forward
- mergeMsgFlag |= nextDir;
+ mergeMsgFlag |= NodeWithFlagWritable.mirrorDirection(nextDir);
mergeDir = MergeDir.FORWARD;
} else if (mergeablePrev && !prevHead) {
// merge backwards
- mergeMsgFlag |= prevDir;
+ mergeMsgFlag |= NodeWithFlagWritable.mirrorDirection(prevDir);
mergeDir = MergeDir.BACKWARD;
}
} else {
@@ -203,21 +203,21 @@
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) > 0 && curID.compareTo(prevID) > 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- mergeMsgFlag |= nextDir;
+ mergeMsgFlag |= NodeWithFlagWritable.mirrorDirection(nextDir);
mergeDir = MergeDir.FORWARD;
}
} else if (!mergeablePrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) > 0) {
// merge towards tail in forward dir
- mergeMsgFlag |= nextDir;
+ mergeMsgFlag |= NodeWithFlagWritable.mirrorDirection(nextDir);
mergeDir = MergeDir.FORWARD;
}
} else if (!mergeableNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) > 0) {
// merge towards tail in reverse dir
- mergeMsgFlag |= prevDir;
+ mergeMsgFlag |= NodeWithFlagWritable.mirrorDirection(prevDir);
mergeDir = MergeDir.BACKWARD;
}
}
@@ -341,10 +341,10 @@
throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+ outputValue.getNode() + " current self: " + inputValue.getNode());
if (inMsg == MessageFlag.MSG_SELF) {
- outPosn.set(outputValue.getNode().getNodeID());
+ outPosn.set(inputValue.getNode().getNodeID());
} else if (inMsg == MessageFlag.MSG_UPDATE_MERGE) {
// merge messages are sent to their merge recipient
- outPosn.set(outputValue.getNode().getListFromDir(inMsg).getPosition(0));
+ outPosn.set(inputValue.getNode().getListFromDir(inMsg).getPosition(0));
} else {
throw new IOException("Unrecongized MessageFlag MSG: " + inMsg);
}
@@ -436,7 +436,7 @@
if (sawCurNode)
throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+ outputValue.getNode() + " current self: " + inputValue.getNode());
- outputKey.set(outputValue.getNode().getNodeID());
+ outputKey.set(inputValue.getNode().getNodeID());
outputValue.set(inFlag, inputValue.getNode());
sawCurNode = true;
break;
@@ -494,7 +494,7 @@
conf.setOutputValueClass(NodeWithFlagWritable.class);
// step 1: decide merge dir and send updates
- FileInputFormat.addInputPaths(conf, inputPath);
+ FileInputFormat.setInputPaths(conf, inputPath);
String outputUpdatesTmp = "h4.updatesProcessed." + new Random().nextDouble() + ".tmp"; // random filename
FileOutputFormat.setOutputPath(conf, new Path(outputUpdatesTmp));
dfs.delete(new Path(outputUpdatesTmp), true);
@@ -503,10 +503,10 @@
RunningJob job = JobClient.runJob(conf);
// step 2: process merges
- FileInputFormat.addInputPaths(conf, outputUpdatesTmp);
- for (Path out : FileInputFormat.getInputPaths(conf)) {
- System.out.println(out);
- }
+ FileInputFormat.setInputPaths(conf, outputUpdatesTmp);
+// for (Path out : FileInputFormat.getInputPaths(conf)) {
+// System.out.println(out);
+// }
Path outputMergeTmp = new Path("h4.mergeProcessed." + new Random().nextDouble() + ".tmp"); // random filename
FileOutputFormat.setOutputPath(conf, outputMergeTmp);
MultipleOutputs.addNamedOutput(conf, H4MergeReducer.TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index 98fe5f6..8f5a3ac 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -114,6 +114,8 @@
}
if (!mergeComplete) {
// if the merge didn't finish, we have to do one final iteration to convert back into (NodeWritable, NullWritable) pairs
+ prevToMergeOutput = mergeOutput;
+ setOutputPaths(inputGraphPath, iMerge);
ConvertGraphFromNodeWithFlagToNodeWritable converter = new ConvertGraphFromNodeWithFlagToNodeWritable();
converter.run(prevToMergeOutput, completeOutput, baseConf);
completeOutputs.add(completeOutput);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index fc369ec..1336bf1 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -129,7 +129,7 @@
} else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
// this message wants to merge node with updateNode.
// the direction flag indicates node's relationship with updateNode
- node.getListFromDir(updateFlag).remove(updateNode.getNodeID()); // remove the node from my edges
+// node.getListFromDir(updateFlag).remove(updateNode.getNodeID()); // remove the node from my edges
node.getKmer().mergeWithKmerInDir(updateFlag, kmerSize, updateNode.getKmer()); // merge with its kmer
// pass along H/T information from the merging node. flipping H ->T, T -> H
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index ef9af84..77a43bf 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -266,7 +266,8 @@
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
- FileInputFormat.addInputPaths(conf, inputPath);
+ FileInputFormat.setInputPaths(conf, inputPath);
+// FileInputFormat.getInputPaths(conf);
FileOutputFormat.setOutputPath(conf, new Path(toMergeOutput));
MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
PositionWritable.class, NodeWithFlagWritable.class);
@@ -280,8 +281,12 @@
RunningJob job = JobClient.runJob(conf);
// move the tmp outputs to the arg-spec'ed dirs
- dfs.rename(new Path(toMergeOutput + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
- dfs.rename(new Path(toMergeOutput + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
+ if (!dfs.rename(new Path(toMergeOutput + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput))) {
+ dfs.mkdirs(new Path(toUpdateOutput));
+ }
+ if (!dfs.rename(new Path(toMergeOutput + File.separator + COMPLETE_OUTPUT), new Path(completeOutput))) {
+ dfs.mkdirs(new Path(completeOutput));
+ }
return job;
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 2836981..a686fed 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -16,35 +16,30 @@
@SuppressWarnings("deprecation")
public class TestPathMergeH4 extends HadoopMiniClusterTest {
- protected String SEQUENCE = "/sequence/";
- protected String GRAPHBUILD = "/graphbuild-unmerged/";
- protected String MERGED = "/pathmerge/";
- protected String LOCAL_SEQUENCE_FILE;
- protected String readsFile;
- protected boolean regenerateGraph;
- {
- HDFS_PATHS = new ArrayList<String>(Arrays.asList(SEQUENCE, GRAPHBUILD, MERGED));
- }
-
- public void setupTestConf(int kmerLength, int readLength, boolean regenerateGraph, String readsFile) {
+ protected String INPUT_GRAPH;
+ protected String OUTPUT_GRAPH;
+ protected String localPath;
+
+ public void setupTestConf(int kmerLength, int readLength, String inputDir) throws IOException {
KMER_LENGTH = kmerLength;
READ_LENGTH = readLength;
- this.readsFile = readsFile;
- this.regenerateGraph = regenerateGraph;
- LOCAL_SEQUENCE_FILE = DATA_ROOT + SEQUENCE + readsFile;
+ INPUT_GRAPH = "/input" + inputDir;
+ OUTPUT_GRAPH = "/output" + inputDir;
+ HDFS_PATHS = new ArrayList<String>(Arrays.asList(OUTPUT_GRAPH));
+ copyLocalToDFS(INPUT_ROOT + inputDir, INPUT_GRAPH);
}
@Test
public void testTwoReads() throws Exception {
- setupTestConf(5, 8, true, "tworeads.txt");
-// testPathNode();
+ setupTestConf(5, 8, "/graphs/pathmerge/singleread");
+ testPathNode();
testMergeOneIteration();
-// testMergeToCompletion();
+ testMergeToCompletion();
}
-
-// @Test
+
+ // @Test
public void testSimpleText() throws Exception {
- setupTestConf(5, 8, false, "text.txt");
+ setupTestConf(5, 8, "text.txt");
testPathNode();
testMergeOneIteration();
// testMergeToCompletion();
@@ -52,47 +47,28 @@
public void testPathNode() throws IOException {
cleanUpOutput();
- prepareGraph();
-
// identify head and tail nodes with pathnode initial
PathNodeInitial inith4 = new PathNodeInitial();
- inith4.run(GRAPHBUILD, "/toMerge", "/toUpdate", "/completed", conf);
- copyResultsToLocal("/toMerge", ACTUAL_ROOT + "path_toMerge", false, conf);
- copyResultsToLocal("/toUpdate", ACTUAL_ROOT + "path_toUpdate", false, conf);
- copyResultsToLocal("/completed", ACTUAL_ROOT + "path_completed", false, conf);
+ inith4.run(INPUT_GRAPH, OUTPUT_GRAPH + "/toMerge", OUTPUT_GRAPH + "/toUpdate", OUTPUT_GRAPH + "/completed", conf);
}
public void testMergeOneIteration() throws Exception {
cleanUpOutput();
- prepareGraph();
MergePathsH4Driver h4 = new MergePathsH4Driver();
- String outputs = h4.run(GRAPHBUILD, 2, KMER_LENGTH, 1, conf);
- int i=0;
+ String outputs = h4.run(INPUT_GRAPH, 2, KMER_LENGTH, 1, conf);
for (String out : outputs.split(",")) {
- copyResultsToLocal(out, ACTUAL_ROOT + MERGED + i++, false, conf);
+ copyResultsToLocal(out, out.replaceFirst("/input/", ACTUAL_ROOT), false, conf);
}
}
-
- public void buildGraph() throws IOException {
- JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
- FileInputFormat.setInputPaths(buildConf, SEQUENCE);
- FileOutputFormat.setOutputPath(buildConf, new Path(GRAPHBUILD));
-
- GraphBuildingDriver tldriver = new GraphBuildingDriver();
- tldriver.run(SEQUENCE, GRAPHBUILD, 2, KMER_LENGTH, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
-
- boolean resultsAreText = false;
- copyResultsToLocal(GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD, resultsAreText, buildConf);
- }
- private void prepareGraph() throws IOException {
- if (regenerateGraph) {
- copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
- buildGraph();
- copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + readsFile + ".binmerge", GRAPHBUILD);
- } else {
- copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + readsFile + ".binmerge", GRAPHBUILD);
+ public void testMergeToCompletion() throws Exception {
+ cleanUpOutput();
+
+ MergePathsH4Driver h4 = new MergePathsH4Driver();
+ String outputs = h4.run(INPUT_GRAPH, 2, KMER_LENGTH, 50, conf);
+ for (String out : outputs.split(",")) {
+ copyResultsToLocal(out, out.replaceFirst("/input/", ACTUAL_ROOT), false, conf);
}
- }
+ }
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
index 4e6a5f9..714d77e 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -89,10 +89,10 @@
FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + "*"));
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, files[0].getPath(), conf);
String destBinDir = localDestFile.substring(0, localDestFile.lastIndexOf("."));
- FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
- new Path(destBinDir), false, conf);
-// SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
-// + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+// FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+// new Path(destBinDir), false, conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+ + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
@@ -109,12 +109,12 @@
bw.write(key.toString() + "\t" + value.toString());
System.out.println(key.toString() + "\t" + value.toString());
bw.newLine();
-// writer.append(key, value);
+ writer.append(key, value);
}
reader.close();
}
-// writer.close();
+ writer.close();
bw.close();
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
index 89f9e96..8a8501d 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -19,6 +19,8 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.util.ReflectionUtils;
@@ -27,6 +29,7 @@
import org.junit.Before;
import org.junit.BeforeClass;
+import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
import edu.uci.ics.genomix.hyracks.test.TestUtils;
/*
@@ -42,7 +45,7 @@
protected static final String EXPECTED_ROOT = "src/test/resources/expected/";
protected static final String ACTUAL_ROOT = "src/test/resources/actual/";
- protected static final String DATA_ROOT = "src/test/resources/data/";
+ protected static final String INPUT_ROOT = "src/test/resources/input/";
protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
@@ -62,12 +65,16 @@
FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
}
+ protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf) throws IOException {
+ copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText, conf, true);
+ }
/*
* Merge and copy a DFS directory to a local destination, converting to text if necessary.
* Also locally store the binary-formatted result if available.
*/
protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
- Configuration conf) throws IOException {
+ Configuration conf, boolean ignoreZeroOutputs) throws IOException {
if (resultsAreText) {
// for text files, just concatenate them together
FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
@@ -88,7 +95,15 @@
}
}
if (validFile == null) {
- throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+ if (ignoreZeroOutputs) {
+ // just make a dummy output dir
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ return;
+ }
+ else {
+ throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+ }
}
// also load the Nodes and write them out as text locally.
@@ -168,7 +183,10 @@
protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
Path dest = new Path(hdfsDest);
dfs.mkdirs(dest);
- dfs.copyFromLocalFile(new Path(localSrc), dest);
+ System.out.println("copying from " + localSrc + " to " + dest);
+ for (File f : new File(localSrc).listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
}
/*
@@ -197,4 +215,27 @@
dfsCluster.shutdown();
mrCluster.shutdown();
}
+
+// public void buildGraph() throws IOException {
+// JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
+// FileInputFormat.setInputPaths(buildConf, SEQUENCE);
+// FileOutputFormat.setOutputPath(buildConf, new Path(INPUT_GRAPH));
+//
+// GraphBuildingDriver tldriver = new GraphBuildingDriver();
+// tldriver.run(SEQUENCE, INPUT_GRAPH, 2, KMER_LENGTH, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
+//
+// boolean resultsAreText = true;
+// copyResultsToLocal(INPUT_GRAPH, ACTUAL_ROOT + INPUT_GRAPH, resultsAreText, buildConf);
+// }
+//
+// private void prepareGraph() throws IOException {
+// if (regenerateGraph) {
+// copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
+// buildGraph();
+// copyLocalToDFS(ACTUAL_ROOT + INPUT_GRAPH + readsFile + ".binmerge", INPUT_GRAPH);
+// } else {
+// copyLocalToDFS(EXPECTED_ROOT + INPUT_GRAPH + readsFile + ".binmerge", INPUT_GRAPH);
+// }
+// }
+
}