simplify driver and test case for h4 graph building
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index 155b999..f8cabee 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -16,90 +16,143 @@
import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+
@SuppressWarnings("deprecation")
public class MergePathsH4Driver {
- private static class Options {
- @Option(name = "-inputpath", usage = "the input path", required = true)
- public String inputPath;
+ private static final String TO_MERGE = "toMerge";
+ private static final String COMPLETE = "complete";
+ private static final String UPDATES = "updates";
+ private String mergeOutput;
+ private String completeOutput;
+ private String updatesOutput;
- @Option(name = "-outputpath", usage = "the output path", required = true)
- public String outputPath;
+ private void setOutputPaths(String basePath, int mergeIteration) {
+ basePath = basePath.replaceAll("/$", ""); // strip trailing slash
+ mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
+ completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
+ updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
+ }
- @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
- public String mergeResultPath;
+ private static class Options {
+ @Option(name = "-inputpath", usage = "the input path", required = true)
+ public String inputPath;
- @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
- public int numReducers;
+ @Option(name = "-outputpath", usage = "the output path", required = true)
+ public String outputPath;
- @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
- public int sizeKmer;
+ @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
+ public String mergeResultPath;
- @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
- public int mergeRound;
-
- @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
- public String hadoopConf;
+ @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+ public int numReducers;
- }
+ @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+ public int sizeKmer;
- public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
- String defaultConfPath, JobConf defaultConf) throws IOException {
- JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
- if (defaultConfPath != null) {
- baseConf.addResource(new Path(defaultConfPath));
- }
- baseConf.setNumReduceTasks(numReducers);
- baseConf.setInt("sizeKmer", sizeKmer);
+ @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
+ public int mergeRound;
- FileSystem dfs = FileSystem.get(baseConf);
- String prevOutput = inputPath;
- dfs.delete(new Path(outputPath), true); // clear any previous output
+ @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
+ public String hadoopConf;
- String tmpOutputPath = "NO_JOBS_DONE";
- boolean finalMerge = false;
- for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
- baseConf.setInt("iMerge", iMerge);
- baseConf.setBoolean("finalMerge", finalMerge);
- MergePathsH4 merger = new MergePathsH4();
- tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
- RunningJob job = merger.run(prevOutput, tmpOutputPath, baseConf);
- if (job.getCounters().findCounter("genomix", "num_merged").getValue() == 0) {
- if (!finalMerge) {
- // all of the pseudoheads have found each other. H3 now behaves like H1
- finalMerge = true;
- } else {
- // already in final merge stage and all paths were merged before. We're done!
- break;
- }
- }
- }
- dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
- }
+ }
- public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
- String defaultConfPath) throws IOException {
- run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
- }
+ /*
+ * Main driver for path merging. Given a graph, this driver runs
+ * PathNodeInitial to ID heads and tails, then does up to @mergeRound
+ * iterations of path merging. Updates during the merge are batch-processed
+ * at the end in a final update job.
+ */
+ public void run(String inputGraphPath, String outputGraphPath,
+ int numReducers, int sizeKmer, int mergeRound,
+ String defaultConfPath, JobConf defaultConf) throws IOException {
+ JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
+ if (defaultConfPath != null) {
+ baseConf.addResource(new Path(defaultConfPath));
+ }
+ baseConf.setNumReduceTasks(numReducers);
+ baseConf.setInt("sizeKmer", sizeKmer);
+ FileSystem dfs = FileSystem.get(baseConf);
- public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
- JobConf defaultConf) throws IOException {
- run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
- }
+ int iMerge = 0;
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
- MergePathsH4Driver driver = new MergePathsH4Driver();
- driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, options.mergeRound,
- null, null);
- }
+ // identify head and tail nodes with pathnode initial
+ PathNodeInitial inith4 = new PathNodeInitial();
+ setOutputPaths(inputGraphPath, iMerge);
+ String prevToMergeOutput = inputGraphPath;
+ inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
+
+ // several iterations of merging
+ MergePathsH4 merger = new MergePathsH4();
+ for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
+ prevToMergeOutput = mergeOutput;
+ setOutputPaths(inputGraphPath, iMerge);
+ merger.run(prevToMergeOutput, mergeOutput, completeOutput,
+ updatesOutput, baseConf);
+ if (dfs.listStatus(new Path(mergeOutput)).length == 0) {
+ // no output from previous run-- we are done!
+ break;
+ }
+ }
+
+ // finally, combine all the completed paths and update messages to
+ // create a single merged graph output
+ dfs.delete(new Path(outputGraphPath), true); // clear any previous
+ // output
+ // use all the "complete" and "update" outputs in addition to the final
+ // (possibly empty) toMerge directories
+ // as input to the final update step. This builds a comma-delim'ed
+ // String of said files.
+ final String lastMergeOutput = mergeOutput;
+ PathFilter updateFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path arg0) {
+ String path = arg0.toString();
+ return path.contains(COMPLETE + "_i")
+ || path.contains(UPDATES + "_i")
+ || path.equals(lastMergeOutput);
+ }
+ };
+ StringBuilder sb = new StringBuilder();
+ String delim = "";
+ for (FileStatus file : dfs.globStatus(new Path("*"), updateFilter)) {
+ sb.append(delim).append(file);
+ delim = ",";
+ }
+ String finalInputs = sb.toString();
+ // TODO run the update iteration
+ }
+
+ public void run(String inputPath, String outputGraphPath, int numReducers,
+ int sizeKmer, int mergeRound, String defaultConfPath)
+ throws IOException {
+ run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound,
+ defaultConfPath, null);
+ }
+
+ public void run(String inputPath, String outputGraphPath, int numReducers,
+ int sizeKmer, int mergeRound, JobConf defaultConf)
+ throws IOException {
+ run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound,
+ null, defaultConf);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ MergePathsH4Driver driver = new MergePathsH4Driver();
+ driver.run(options.inputPath, options.outputPath, options.numReducers,
+ options.sizeKmer, options.mergeRound, null, null);
+ }
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index cc922de..c04ba46 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -54,11 +54,12 @@
}
PathNodeInitial inith3 = new PathNodeInitial();
- inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
- copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+ inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
+ copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+ copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
MergePathsH3Driver h3 = new MergePathsH3Driver();
- h3.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
+ h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 9e799f3..667c4b1 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -11,27 +11,25 @@
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3Driver;
import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
+import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
@SuppressWarnings("deprecation")
-public class TestPathMergeH4 extends GenomixMiniClusterTest {
- protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
- protected String HDFS_SEQUENCE = "/00-sequence/";
- protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
- protected String HDFS_MARKPATHS = "/02-pathmark/";
- protected String HDFS_MERGED = "/03-pathmerge/";
+public class TestPathMergeH4 extends HadoopMiniClusterTest {
+ protected final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ protected final String SEQUENCE = "/00-sequence/";
+ protected final String GRAPHBUILD = "/01-graphbuild/";
+ protected final String MERGED = "/02-pathmerge/";
- protected String GRAPHBUILD_FILE = "graphbuild.result";
- protected String PATHMARKS_FILE = "markpaths.result";
- protected String PATHMERGE_FILE = "h4.mergepath.result";
- protected boolean regenerateGraph = true;
+ protected final boolean regenerateGraph = true;
{
KMER_LENGTH = 5;
READ_LENGTH = 8;
- HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MARKPATHS, HDFS_MERGED));
+ HDFS_PATHS = new ArrayList<String>(Arrays.asList(SEQUENCE, GRAPHBUILD, MERGED));
conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
}
@@ -40,33 +38,29 @@
public void TestMergeOneIteration() throws Exception {
cleanUpOutput();
if (regenerateGraph) {
- copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
buildGraph();
- copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
+ copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
} else {
- copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
+ copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
}
- PathNodeInitial inith4 = new PathNodeInitial();
- inith4.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
- copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
-
MergePathsH4Driver h4 = new MergePathsH4Driver();
- h4.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
- copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
+ h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 1, conf);
+ copyResultsToLocal(MERGED, ACTUAL_ROOT + MERGED, false, conf);
}
public void buildGraph() throws Exception {
JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
- FileInputFormat.setInputPaths(buildConf, HDFS_SEQUENCE);
- FileOutputFormat.setOutputPath(buildConf, new Path(HDFS_GRAPHBUILD));
- buildConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
- buildConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- driver.runJob(new GenomixJobConf(buildConf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- String fileFormat = buildConf.get(GenomixJobConf.OUTPUT_FORMAT);
- boolean resultsAreText = GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat);
- copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, resultsAreText, buildConf);
+ FileInputFormat.setInputPaths(buildConf, SEQUENCE);
+ FileOutputFormat.setOutputPath(buildConf, new Path(GRAPHBUILD));
+
+ GraphBuildingDriver tldriver = new GraphBuildingDriver();
+ tldriver.run(SEQUENCE, GRAPHBUILD, 2, KMER_LENGTH, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
+
+ boolean resultsAreText = false;
+ copyResultsToLocal(GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD, resultsAreText, buildConf);
}
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
new file mode 100644
index 0000000..2cd3b53
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -0,0 +1,196 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+
+/*
+ * A base class providing most of the boilerplate for Hadoop-based tests
+ */
+@SuppressWarnings("deprecation")
+public class HadoopMiniClusterTest {
+ protected int KMER_LENGTH = 5;
+ protected int READ_LENGTH = 8;
+
+ // subclass should modify this to include the HDFS directories that should be cleaned up
+ protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+ protected static String EXPECTED_ROOT = "src/test/resources/expected/";
+ protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+
+ protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+ protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
+
+ protected static MiniDFSCluster dfsCluster;
+ protected static MiniMRCluster mrCluster;
+ protected static FileSystem dfs;
+ protected static JobConf conf = new JobConf();
+ protected static int numberOfNC = 1;
+ protected static int numPartitionPerMachine = 1;
+
+ @BeforeClass
+ public static void setUpMiniCluster() throws Exception {
+ cleanupStores();
+ startHDFS();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary.
+ * Also locally store the binary-formatted result if available.
+ */
+ protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf) throws IOException {
+ if (resultsAreText) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+ new Path(localDestFile), false, conf, null);
+ } else {
+ // file is binary
+ // save the entire binary output dir
+ FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+ new Path(localDestFile + ".bindir"), false, conf);
+
+ // chomp through output files
+ FileStatus[] files = ArrayUtils.addAll(dfs.globStatus(new Path(hdfsSrcDir + "*")), dfs.globStatus(new Path(hdfsSrcDir + "*/*")));
+ FileStatus validFile = null;
+ for (FileStatus f : files) {
+ if (f.getLen() != 0) {
+ validFile = f;
+ break;
+ }
+ }
+ if (validFile == null) {
+ throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+ }
+
+ // also load the Nodes and write them out as text locally.
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, validFile.getPath(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+ + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ for (FileStatus f : files) {
+ if (f.getLen() == 0) {
+ continue;
+ }
+ reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ writer.append(key, value);
+
+ }
+ reader.close();
+ }
+ writer.close();
+ bw.close();
+ }
+
+ }
+
+ protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ protected static void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ protected static void startHDFS() throws IOException {
+// conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ // conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+// conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+ System.out.println(dfs.getUri().toString());
+
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF)));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+ Path dest = new Path(hdfsDest);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(new Path(localSrc), dest);
+ }
+
+ /*
+ * Remove the local "actual" folder and any hdfs folders in use by this test
+ */
+ public void cleanUpOutput() throws IOException {
+ // local cleanup
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(ACTUAL_ROOT))) {
+ lfs.delete(new Path(ACTUAL_ROOT), true);
+ }
+ // dfs cleanup
+ for (String path : HDFS_PATHS) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cleanupHDFS();
+ }
+
+ protected static void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ mrCluster.shutdown();
+ }
+}