Merge branch 'anbangx/fullstack_bidirection' of https://code.google.com/p/hyracks into anbangx/fullstack_bidirection
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index b405161..d8d4429 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -1,75 +1,52 @@
 package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
 
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.test.TestUtils;
 import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
 
 @SuppressWarnings("deprecation")
-public class TestPathMergeH3 {
-    private static final int KMER_LENGTH = 5;
-    private static final int READ_LENGTH = 8;
+public class TestPathMergeH3 extends GenomixMiniClusterTest {
+    protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+    protected String HDFS_SEQUENCE = "/00-sequence/";
+    protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
+    protected String HDFS_MERGED = "/02-graphmerge/";
     
-    private static final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
-    private static final String HDFS_SEQUENCE = "/00-sequence/";
-    private static final String HDFS_GRAPHBUILD = "/01-graphbuild/";
-    private static final String HDFS_MERGED = "/02-graphmerge/";
-
-    private static final String EXPECTED_ROOT = "src/test/resources/expected/";
-    private static final String ACTUAL_ROOT = "src/test/resources/actual/";
-    private static final String GRAPHBUILD_FILE = "result.graphbuild.txt";
-    private static final String PATHMERGE_FILE = "result.mergepath.txt";
+    protected String GRAPHBUILD_FILE = "result.graphbuild.txt";
+    protected String PATHMERGE_FILE = "result.mergepath.txt";
     
-    private static final String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
-    
-    private MiniDFSCluster dfsCluster;
-
-    private static JobConf conf = new JobConf();
-    private int numberOfNC = 2;
-    private int numPartitionPerMachine = 1;
-
-    private Driver driver;
+    {
+        KMER_LENGTH = 5;
+        READ_LENGTH = 8;
+        HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED));
+        // we have to specify what kind of keys and values this job has
+        key = new NodeWritable(KMER_LENGTH);
+        value = NullWritable.get();
+    }
 
     @Test
     public void TestBuildGraph() throws Exception {
-        copySequenceToDFS();
+        cleanUpOutput();
+        copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
         buildGraph();
     }
-    
+
 //    @Test
     public void TestMergeOneIteration() throws Exception {
-        copySequenceToDFS();
+        cleanUpOutput();
+        copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
         buildGraph();
         MergePathsH3Driver h3 = new MergePathsH3Driver();
-        h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, null, conf);
+        h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, ACTUAL_ROOT + "conf.xml", null);
         copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
     }
 
@@ -83,133 +60,4 @@
         driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
         copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
     }
-
-    @Before
-    public void setUp() throws Exception {
-        cleanupStores();
-        HyracksUtils.init();
-        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
-        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
-        startHDFS();
-
-        conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
-        conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
-        driver = new Driver(HyracksUtils.CC_HOST,
-                HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
-    }
-    
-    /*
-     * Merge and copy a DFS directory to a local destination, converting to text if necessary. Also locally store the binary-formatted result if available.
-     */
-    private static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
-        String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
-        if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
-            // for text files, just concatenate them together
-            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
-                    FileSystem.getLocal(new Configuration()), new Path(localDestFile),
-                    false, conf, null);
-        } else {
-            // file is binary
-            // merge and store the binary format
-            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
-                    FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
-                    false, conf, null);
-            // load the Node's and write them out as text locally
-            FileSystem.getLocal(new Configuration()).mkdirs(new Path(localDestFile).getParent());
-            File filePathTo = new File(localDestFile);
-            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
-            for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
-                Path path = new Path(hdfsSrcDir + "part-" + i);
-                FileSystem dfs = FileSystem.get(conf);
-                if (!dfs.exists(path)) {
-                    break;
-                }
-                if (dfs.getFileStatus(path).getLen() == 0) {
-                    continue;
-                }
-                SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-                NodeWritable key = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH));
-                NullWritable value = NullWritable.get();
-                while (reader.next(key, value)) {
-                    if (key == null || value == null) {
-                        break;
-                    }
-                    bw.write(key.toString() + "\t" + value.toString());
-                    System.out.println(key.toString() + "\t" + value.toString());
-                    bw.newLine();
-                }
-                reader.close();
-            }
-            bw.close();
-        }
-
-    }
-    
-    private boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
-        File dumped = new File(actualPath); 
-        if (poslistField != null) {
-            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
-        } else {
-            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
-        }
-        return true;
-    }
-
-    private void cleanupStores() throws IOException {
-        FileUtils.forceMkdir(new File("teststore"));
-        FileUtils.forceMkdir(new File("build"));
-        FileUtils.cleanDirectory(new File("teststore"));
-        FileUtils.cleanDirectory(new File("build"));
-    }
-
-    private void startHDFS() throws IOException {
-        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
-        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
-        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
-
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        lfs.delete(new Path("build"), true);
-        System.setProperty("hadoop.log.dir", "logs");
-        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-        
-        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_ROOT + "conf.xml")));
-        conf.writeXml(confOutput);
-        confOutput.close();
-    }
-    
-    private void copySequenceToDFS() throws IOException {
-        FileSystem dfs = FileSystem.get(conf);
-        Path src = new Path(LOCAL_SEQUENCE_FILE);
-        Path dest = new Path(HDFS_SEQUENCE);
-        dfs.mkdirs(dest);
-        // dfs.mkdirs(result);
-        dfs.copyFromLocalFile(src, dest);
-    }
-    
-    @BeforeClass
-    public static void cleanUpEntry() throws IOException {
-        // local cleanup
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        if (lfs.exists(new Path(ACTUAL_ROOT))) {
-            lfs.delete(new Path(ACTUAL_ROOT), true);
-        }
-        // dfs cleanup
-        FileSystem dfs = FileSystem.get(conf);
-        String[] paths = {HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED};
-        for (String path : paths) {
-            if (dfs.exists(new Path(path))) {
-                dfs.delete(new Path(path), true);
-            }
-        }
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        HyracksUtils.deinit();
-        cleanupHDFS();
-    }
-
-    private void cleanupHDFS() throws Exception {
-        dfsCluster.shutdown();
-    }
 }
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
new file mode 100644
index 0000000..544f5fc
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -0,0 +1,192 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.junit.After;
+import org.junit.Before;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+/*
+ * A base class providing most of the boilerplate for Genomix-based tests
+ */
+@SuppressWarnings("deprecation")
+public class GenomixMiniClusterTest {
+    protected int KMER_LENGTH = 5;
+    protected int READ_LENGTH = 8;
+    
+    // subclass should modify this to include the HDFS directories that should be cleaned up
+    protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+    protected String EXPECTED_ROOT = "src/test/resources/expected/";
+    protected String ACTUAL_ROOT = "src/test/resources/actual/";
+    
+    protected String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+    
+    protected MiniDFSCluster dfsCluster;
+    protected JobConf conf = new JobConf();
+    protected int numberOfNC = 1;
+    protected int numPartitionPerMachine = 1;
+    protected Driver driver;
+
+    protected Writable key;
+    protected Writable value;
+    protected MiniMRCluster mrCluster;
+
+
+    @Before
+    public void setUp() throws Exception {
+        cleanupStores();
+        HyracksUtils.init();
+        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+        startHDFS();
+
+        conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+        conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+        driver = new Driver(HyracksUtils.CC_HOST,
+                HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+    }
+    
+    /*
+     * Merge and copy a DFS directory to a local destination, converting to text if necessary. 
+     * Also locally store the binary-formatted result if available.
+     */
+    protected void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
+        String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+        if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
+            // for text files, just concatenate them together
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+                    FileSystem.getLocal(new Configuration()), new Path(localDestFile),
+                    false, conf, null);
+        } else {
+            // file is binary
+            // merge and store the binary format
+//            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+//                    FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
+//                    false, conf, null);
+            // load the Node's and write them out as text locally
+            FileSystem lfs = FileSystem.getLocal(new Configuration());
+            lfs.mkdirs(new Path(localDestFile).getParent());
+            File filePathTo = new File(localDestFile);
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+            FileSystem dfs = FileSystem.get(conf);
+            SequenceFile.Reader reader = new SequenceFile.Reader(dfs, new Path(hdfsSrcDir + "part-0"), conf);
+            SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile + ".bin"), reader.getKeyClass(), reader.getValueClass());
+            
+            for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
+                Path path = new Path(hdfsSrcDir + "part-" + i);
+                if (!dfs.exists(path)) {
+                    break;
+                }
+                if (dfs.getFileStatus(path).getLen() == 0) {
+                    continue;
+                }
+                reader = new SequenceFile.Reader(dfs, path, conf);
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                    writer.append(key, value);
+                    
+                }
+                reader.close();
+            }
+            writer.close();
+            bw.close();
+        }
+
+    }
+    
+    protected boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+        File dumped = new File(actualPath); 
+        if (poslistField != null) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
+
+    protected void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    protected void startHDFS() throws IOException {
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        //mrCluster = new MiniMRCluster(4, dfsCluster.getFileSystem().getUri().toString(), 2);
+        
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(ACTUAL_ROOT + "conf.xml")));
+        conf.writeXml(confOutput);
+        confOutput.close();
+    }
+    
+    protected void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+        FileSystem dfs = FileSystem.get(conf);
+        Path src = new Path(localSrc);
+        Path dest = new Path(hdfsDest);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(src, dest);
+    }
+    
+    /*
+     * Remove the local "actual" folder and any hdfs folders in use by this test
+     */
+    public void cleanUpOutput() throws IOException {
+        // local cleanup
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(ACTUAL_ROOT))) {
+            lfs.delete(new Path(ACTUAL_ROOT), true);
+        }
+        // dfs cleanup
+        FileSystem dfs = FileSystem.get(conf);
+        for (String path : HDFS_PATHS) {
+            if (dfs.exists(new Path(path))) {
+                dfs.delete(new Path(path), true);
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        HyracksUtils.deinit();
+        cleanupHDFS();
+    }
+
+    protected void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+        //mrCluster.shutdown();
+    }
+}