Merge branch 'anbangx/fullstack_bidirection' of https://code.google.com/p/hyracks into anbangx/fullstack_bidirection
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index b405161..d8d4429 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -1,75 +1,52 @@
package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
-import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.test.TestUtils;
import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
@SuppressWarnings("deprecation")
-public class TestPathMergeH3 {
- private static final int KMER_LENGTH = 5;
- private static final int READ_LENGTH = 8;
+public class TestPathMergeH3 extends GenomixMiniClusterTest {
+ protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ protected String HDFS_SEQUENCE = "/00-sequence/";
+ protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
+ protected String HDFS_MERGED = "/02-graphmerge/";
- private static final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
- private static final String HDFS_SEQUENCE = "/00-sequence/";
- private static final String HDFS_GRAPHBUILD = "/01-graphbuild/";
- private static final String HDFS_MERGED = "/02-graphmerge/";
-
- private static final String EXPECTED_ROOT = "src/test/resources/expected/";
- private static final String ACTUAL_ROOT = "src/test/resources/actual/";
- private static final String GRAPHBUILD_FILE = "result.graphbuild.txt";
- private static final String PATHMERGE_FILE = "result.mergepath.txt";
+ protected String GRAPHBUILD_FILE = "result.graphbuild.txt";
+ protected String PATHMERGE_FILE = "result.mergepath.txt";
- private static final String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
-
- private MiniDFSCluster dfsCluster;
-
- private static JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private Driver driver;
+ {
+ KMER_LENGTH = 5;
+ READ_LENGTH = 8;
+ HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED));
+ // we have to specify what kind of keys and values this job has
+ key = new NodeWritable(KMER_LENGTH);
+ value = NullWritable.get();
+ }
@Test
public void TestBuildGraph() throws Exception {
- copySequenceToDFS();
+ cleanUpOutput();
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
buildGraph();
}
-
+
// @Test
public void TestMergeOneIteration() throws Exception {
- copySequenceToDFS();
+ cleanUpOutput();
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
buildGraph();
MergePathsH3Driver h3 = new MergePathsH3Driver();
- h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, null, conf);
+ h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, ACTUAL_ROOT + "conf.xml", null);
copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
}
@@ -83,133 +60,4 @@
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
}
-
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_ROOT));
- FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
- startHDFS();
-
- conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
- conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
- driver = new Driver(HyracksUtils.CC_HOST,
- HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
-
- /*
- * Merge and copy a DFS directory to a local destination, converting to text if necessary. Also locally store the binary-formatted result if available.
- */
- private static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
- String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
- if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
- // for text files, just concatenate them together
- FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
- FileSystem.getLocal(new Configuration()), new Path(localDestFile),
- false, conf, null);
- } else {
- // file is binary
- // merge and store the binary format
- FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
- FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
- false, conf, null);
- // load the Node's and write them out as text locally
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(localDestFile).getParent());
- File filePathTo = new File(localDestFile);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
- Path path = new Path(hdfsSrcDir + "part-" + i);
- FileSystem dfs = FileSystem.get(conf);
- if (!dfs.exists(path)) {
- break;
- }
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- NodeWritable key = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH));
- NullWritable value = NullWritable.get();
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- }
-
- }
-
- private boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
- File dumped = new File(actualPath);
- if (poslistField != null) {
- TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
- } else {
- TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- }
- return true;
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
- conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
- conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_ROOT + "conf.xml")));
- conf.writeXml(confOutput);
- confOutput.close();
- }
-
- private void copySequenceToDFS() throws IOException {
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(LOCAL_SEQUENCE_FILE);
- Path dest = new Path(HDFS_SEQUENCE);
- dfs.mkdirs(dest);
- // dfs.mkdirs(result);
- dfs.copyFromLocalFile(src, dest);
- }
-
- @BeforeClass
- public static void cleanUpEntry() throws IOException {
- // local cleanup
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(ACTUAL_ROOT))) {
- lfs.delete(new Path(ACTUAL_ROOT), true);
- }
- // dfs cleanup
- FileSystem dfs = FileSystem.get(conf);
- String[] paths = {HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED};
- for (String path : paths) {
- if (dfs.exists(new Path(path))) {
- dfs.delete(new Path(path), true);
- }
- }
- }
-
- @After
- public void tearDown() throws Exception {
- HyracksUtils.deinit();
- cleanupHDFS();
- }
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
new file mode 100644
index 0000000..544f5fc
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -0,0 +1,192 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.junit.After;
+import org.junit.Before;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+/*
+ * A base class providing most of the boilerplate for Genomix-based tests
+ */
+@SuppressWarnings("deprecation")
+public class GenomixMiniClusterTest {
+ protected int KMER_LENGTH = 5;
+ protected int READ_LENGTH = 8;
+
+ // subclass should modify this to include the HDFS directories that should be cleaned up
+ protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+ protected String EXPECTED_ROOT = "src/test/resources/expected/";
+ protected String ACTUAL_ROOT = "src/test/resources/actual/";
+
+ protected String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+
+ protected MiniDFSCluster dfsCluster;
+ protected JobConf conf = new JobConf();
+ protected int numberOfNC = 1;
+ protected int numPartitionPerMachine = 1;
+ protected Driver driver;
+
+ protected Writable key;
+ protected Writable value;
+ protected MiniMRCluster mrCluster;
+
+
+ @Before
+ public void setUp() throws Exception {
+ cleanupStores();
+ HyracksUtils.init();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ startHDFS();
+
+ conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+ conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+ driver = new Driver(HyracksUtils.CC_HOST,
+ HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary.
+ * Also locally store the binary-formatted result if available.
+ */
+ protected void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
+ String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+ if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+ FileSystem.getLocal(new Configuration()), new Path(localDestFile),
+ false, conf, null);
+ } else {
+ // file is binary
+ // merge and store the binary format
+// FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+// FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
+// false, conf, null);
+ // load the Node's and write them out as text locally
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ FileSystem dfs = FileSystem.get(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, new Path(hdfsSrcDir + "part-0"), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile + ".bin"), reader.getKeyClass(), reader.getValueClass());
+
+ for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
+ Path path = new Path(hdfsSrcDir + "part-" + i);
+ if (!dfs.exists(path)) {
+ break;
+ }
+ if (dfs.getFileStatus(path).getLen() == 0) {
+ continue;
+ }
+ reader = new SequenceFile.Reader(dfs, path, conf);
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ writer.append(key, value);
+
+ }
+ reader.close();
+ }
+ writer.close();
+ bw.close();
+ }
+
+ }
+
+ protected boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ protected void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ protected void startHDFS() throws IOException {
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+ conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ //mrCluster = new MiniMRCluster(4, dfsCluster.getFileSystem().getUri().toString(), 2);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(ACTUAL_ROOT + "conf.xml")));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ protected void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(localSrc);
+ Path dest = new Path(hdfsDest);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+ }
+
+ /*
+ * Remove the local "actual" folder and any hdfs folders in use by this test
+ */
+ public void cleanUpOutput() throws IOException {
+ // local cleanup
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(ACTUAL_ROOT))) {
+ lfs.delete(new Path(ACTUAL_ROOT), true);
+ }
+ // dfs cleanup
+ FileSystem dfs = FileSystem.get(conf);
+ for (String path : HDFS_PATHS) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ HyracksUtils.deinit();
+ cleanupHDFS();
+ }
+
+ protected void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ //mrCluster.shutdown();
+ }
+}