simplify driver and test case for h4 graph building
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index 155b999..f8cabee 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -16,90 +16,143 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+
 @SuppressWarnings("deprecation")
 public class MergePathsH4Driver {
 
-    private static class Options {
-        @Option(name = "-inputpath", usage = "the input path", required = true)
-        public String inputPath;
+	private static final String TO_MERGE = "toMerge";
+	private static final String COMPLETE = "complete";
+	private static final String UPDATES = "updates";
+	private String mergeOutput;
+	private String completeOutput;
+	private String updatesOutput;
 
-        @Option(name = "-outputpath", usage = "the output path", required = true)
-        public String outputPath;
+	private void setOutputPaths(String basePath, int mergeIteration) {
+		basePath = basePath.replaceAll("/$", ""); // strip trailing slash
+		mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
+		completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
+		updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
+	}
 
-        @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
-        public String mergeResultPath;
+	private static class Options {
+		@Option(name = "-inputpath", usage = "the input path", required = true)
+		public String inputPath;
 
-        @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
-        public int numReducers;
+		@Option(name = "-outputpath", usage = "the output path", required = true)
+		public String outputPath;
 
-        @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
-        public int sizeKmer;
+		@Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
+		public String mergeResultPath;
 
-        @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
-        public int mergeRound;
-        
-        @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
-        public String hadoopConf;
+		@Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+		public int numReducers;
 
-    }
+		@Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+		public int sizeKmer;
 
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
-            String defaultConfPath, JobConf defaultConf) throws IOException {
-        JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
-        if (defaultConfPath != null) {
-            baseConf.addResource(new Path(defaultConfPath));
-        }
-        baseConf.setNumReduceTasks(numReducers);
-        baseConf.setInt("sizeKmer", sizeKmer);
+		@Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
+		public int mergeRound;
 
-        FileSystem dfs = FileSystem.get(baseConf);
-        String prevOutput = inputPath;
-        dfs.delete(new Path(outputPath), true); // clear any previous output
+		@Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
+		public String hadoopConf;
 
-        String tmpOutputPath = "NO_JOBS_DONE";
-        boolean finalMerge = false;
-        for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
-            baseConf.setInt("iMerge", iMerge);
-            baseConf.setBoolean("finalMerge", finalMerge);
-            MergePathsH4 merger = new MergePathsH4();
-            tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
-            RunningJob job = merger.run(prevOutput, tmpOutputPath, baseConf);
-            if (job.getCounters().findCounter("genomix", "num_merged").getValue() == 0) {
-                if (!finalMerge) {
-                    // all of the pseudoheads have found each other.  H3 now behaves like H1
-                    finalMerge = true;
-                } else {
-                    // already in final merge stage and all paths were merged before.  We're done!
-                    break;
-                }
-            }
-        }
-        dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
-    }
+	}
 
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
-            String defaultConfPath) throws IOException {
-        run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
-    }
+	/*
+	 * Main driver for path merging. Given a graph, this driver runs
+	 * PathNodeInitial to ID heads and tails, then does up to @mergeRound
+	 * iterations of path merging. Updates during the merge are batch-processed
+	 * at the end in a final update job.
+	 */
+	public void run(String inputGraphPath, String outputGraphPath,
+			int numReducers, int sizeKmer, int mergeRound,
+			String defaultConfPath, JobConf defaultConf) throws IOException {
+		JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
+		if (defaultConfPath != null) {
+			baseConf.addResource(new Path(defaultConfPath));
+		}
+		baseConf.setNumReduceTasks(numReducers);
+		baseConf.setInt("sizeKmer", sizeKmer);
+		FileSystem dfs = FileSystem.get(baseConf);
 
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
-            JobConf defaultConf) throws IOException {
-        run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
-    }
+		int iMerge = 0;
 
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        parser.parseArgument(args);
-        MergePathsH4Driver driver = new MergePathsH4Driver();
-        driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, options.mergeRound,
-                null, null);
-    }
+		// identify head and tail nodes with pathnode initial
+		PathNodeInitial inith4 = new PathNodeInitial();
+		setOutputPaths(inputGraphPath, iMerge);
+		String prevToMergeOutput = inputGraphPath;
+		inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
+
+		// several iterations of merging
+		MergePathsH4 merger = new MergePathsH4();
+		for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
+			prevToMergeOutput = mergeOutput;
+			setOutputPaths(inputGraphPath, iMerge);
+			merger.run(prevToMergeOutput, mergeOutput, completeOutput,
+					updatesOutput, baseConf);
+			if (dfs.listStatus(new Path(mergeOutput)).length == 0) {
+				// no output from previous run-- we are done!
+				break;
+			}
+		}
+
+		// finally, combine all the completed paths and update messages to
+		// create a single merged graph output
+		dfs.delete(new Path(outputGraphPath), true); // clear any previous
+														// output
+		// use all the "complete" and "update" outputs in addition to the final
+		// (possibly empty) toMerge directories
+		// as input to the final update step. This builds a comma-delim'ed
+		// String of said files.
+		final String lastMergeOutput = mergeOutput;
+		PathFilter updateFilter = new PathFilter() {
+			@Override
+			public boolean accept(Path arg0) {
+				String path = arg0.toString();
+				return path.contains(COMPLETE + "_i")
+						|| path.contains(UPDATES + "_i")
+						|| path.equals(lastMergeOutput);
+			}
+		};
+		StringBuilder sb = new StringBuilder();
+		String delim = "";
+		for (FileStatus file : dfs.globStatus(new Path("*"), updateFilter)) {
+			sb.append(delim).append(file);
+			delim = ",";
+		}
+		String finalInputs = sb.toString();
+		// TODO run the update iteration
+	}
+
+	public void run(String inputPath, String outputGraphPath, int numReducers,
+			int sizeKmer, int mergeRound, String defaultConfPath)
+			throws IOException {
+		run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound,
+				defaultConfPath, null);
+	}
+
+	public void run(String inputPath, String outputGraphPath, int numReducers,
+			int sizeKmer, int mergeRound, JobConf defaultConf)
+			throws IOException {
+		run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound,
+				null, defaultConf);
+	}
+
+	public static void main(String[] args) throws Exception {
+		Options options = new Options();
+		CmdLineParser parser = new CmdLineParser(options);
+		parser.parseArgument(args);
+		MergePathsH4Driver driver = new MergePathsH4Driver();
+		driver.run(options.inputPath, options.outputPath, options.numReducers,
+				options.sizeKmer, options.mergeRound, null, null);
+	}
 }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index cc922de..c04ba46 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -54,11 +54,12 @@
         }
         
         PathNodeInitial inith3 = new PathNodeInitial();
-        inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
-        copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+        inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
+        copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+        copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
 
         MergePathsH3Driver h3 = new MergePathsH3Driver();
-        h3.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
+        h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
         copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
     }
 
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 9e799f3..667c4b1 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -11,27 +11,25 @@
 
 import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3Driver;
 import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
+import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
 import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 
 @SuppressWarnings("deprecation")
-public class TestPathMergeH4 extends GenomixMiniClusterTest {
-    protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
-    protected String HDFS_SEQUENCE = "/00-sequence/";
-    protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
-    protected String HDFS_MARKPATHS = "/02-pathmark/";
-    protected String HDFS_MERGED = "/03-pathmerge/";
+public class TestPathMergeH4 extends HadoopMiniClusterTest {
+    protected final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+    protected final  String SEQUENCE = "/00-sequence/";
+    protected final String GRAPHBUILD = "/01-graphbuild/";
+    protected final String MERGED = "/02-pathmerge/";
     
-    protected String GRAPHBUILD_FILE = "graphbuild.result";
-    protected String PATHMARKS_FILE = "markpaths.result";
-    protected String PATHMERGE_FILE = "h4.mergepath.result";
-    protected boolean regenerateGraph = true;
+    protected final boolean regenerateGraph = true;
     
     {
         KMER_LENGTH = 5;
         READ_LENGTH = 8;
-        HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MARKPATHS, HDFS_MERGED));
+        HDFS_PATHS = new ArrayList<String>(Arrays.asList(SEQUENCE, GRAPHBUILD, MERGED));
         conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
         conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
     }
@@ -40,33 +38,29 @@
     public void TestMergeOneIteration() throws Exception {
         cleanUpOutput();
         if (regenerateGraph) {
-            copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
+            copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
             buildGraph();
-            copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
+            copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
         } else {
-            copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
+            copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
         }
         
-        PathNodeInitial inith4 = new PathNodeInitial();
-        inith4.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
-        copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
-
         MergePathsH4Driver h4 = new MergePathsH4Driver();
-        h4.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
-        copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
+        h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 1, conf);
+        copyResultsToLocal(MERGED, ACTUAL_ROOT + MERGED, false, conf);
     }
 
 
 
     public void buildGraph() throws Exception {
         JobConf buildConf = new JobConf(conf);  // use a separate conf so we don't interfere with other jobs 
-        FileInputFormat.setInputPaths(buildConf, HDFS_SEQUENCE);
-        FileOutputFormat.setOutputPath(buildConf, new Path(HDFS_GRAPHBUILD));
-        buildConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
-        buildConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        driver.runJob(new GenomixJobConf(buildConf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        String fileFormat = buildConf.get(GenomixJobConf.OUTPUT_FORMAT);
-        boolean resultsAreText = GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat);
-        copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, resultsAreText, buildConf);
+        FileInputFormat.setInputPaths(buildConf, SEQUENCE);
+        FileOutputFormat.setOutputPath(buildConf, new Path(GRAPHBUILD));
+        
+        GraphBuildingDriver tldriver = new GraphBuildingDriver();
+        tldriver.run(SEQUENCE, GRAPHBUILD, 2, KMER_LENGTH, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
+        
+        boolean resultsAreText = false;
+        copyResultsToLocal(GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD, resultsAreText, buildConf);
     }
 }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
new file mode 100644
index 0000000..2cd3b53
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -0,0 +1,196 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+
+/*
+ * A base class providing most of the boilerplate for Hadoop-based tests
+ */
+@SuppressWarnings("deprecation")
+public class HadoopMiniClusterTest {
+    protected int KMER_LENGTH = 5;
+    protected int READ_LENGTH = 8;
+
+    // subclass should modify this to include the HDFS directories that should be cleaned up
+    protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+    protected static String EXPECTED_ROOT = "src/test/resources/expected/";
+    protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+
+    protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+    protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
+
+    protected static MiniDFSCluster dfsCluster;
+    protected static MiniMRCluster mrCluster;
+    protected static FileSystem dfs;
+    protected static JobConf conf = new JobConf();
+    protected static int numberOfNC = 1;
+    protected static int numPartitionPerMachine = 1;
+
+    @BeforeClass
+    public static void setUpMiniCluster() throws Exception {
+        cleanupStores();
+        startHDFS();
+        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+    }
+
+    /*
+     * Merge and copy a DFS directory to a local destination, converting to text if necessary. 
+     * Also locally store the binary-formatted result if available.
+     */
+    protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf) throws IOException {
+        if (resultsAreText) {
+            // for text files, just concatenate them together
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+                    new Path(localDestFile), false, conf, null);
+        } else {
+            // file is binary
+            // save the entire binary output dir
+            FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+                    new Path(localDestFile + ".bindir"), false, conf);
+            
+            // chomp through output files
+            FileStatus[] files = ArrayUtils.addAll(dfs.globStatus(new Path(hdfsSrcDir + "*")), dfs.globStatus(new Path(hdfsSrcDir + "*/*")));
+            FileStatus validFile = null;
+            for (FileStatus f : files) {
+            	if (f.getLen() != 0) {
+            		validFile = f;
+            		break;
+            	}
+            }
+            if (validFile == null) {
+                throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+            }
+
+            // also load the Nodes and write them out as text locally. 
+            FileSystem lfs = FileSystem.getLocal(new Configuration());
+            lfs.mkdirs(new Path(localDestFile).getParent());
+            File filePathTo = new File(localDestFile);
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+            SequenceFile.Reader reader = new SequenceFile.Reader(dfs, validFile.getPath(), conf);
+            SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+                    + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+            for (FileStatus f : files) {
+                if (f.getLen() == 0) {
+                    continue;
+                }
+                reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                    writer.append(key, value);
+
+                }
+                reader.close();
+            }
+            writer.close();
+            bw.close();
+        }
+
+    }
+
+    protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+        File dumped = new File(actualPath);
+        if (poslistField != null) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
+
+    protected static void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    protected static void startHDFS() throws IOException {
+//        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+        //        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+//        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        dfs = dfsCluster.getFileSystem();
+        mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+        System.out.println(dfs.getUri().toString());
+
+        DataOutputStream confOutput = new DataOutputStream(
+                new FileOutputStream(new File(HADOOP_CONF)));
+        conf.writeXml(confOutput);
+        confOutput.close();
+    }
+
+    protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+        Path dest = new Path(hdfsDest);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(new Path(localSrc), dest);
+    }
+
+    /*
+     * Remove the local "actual" folder and any hdfs folders in use by this test
+     */
+    public void cleanUpOutput() throws IOException {
+        // local cleanup
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(ACTUAL_ROOT))) {
+            lfs.delete(new Path(ACTUAL_ROOT), true);
+        }
+        // dfs cleanup
+        for (String path : HDFS_PATHS) {
+            if (dfs.exists(new Path(path))) {
+                dfs.delete(new Path(path), true);
+            }
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        cleanupHDFS();
+    }
+
+    protected static void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+        mrCluster.shutdown();
+    }
+}