Merge branch 'anbangx/fullstack_genomix' into genomix/fullstack_genomix
diff --git a/genomix/genomix-data/pom.xml b/genomix/genomix-data/pom.xml
index 18500cc..d164b0c 100644
--- a/genomix/genomix-data/pom.xml
+++ b/genomix/genomix-data/pom.xml
@@ -50,6 +50,11 @@
 			<type>maven-plugin</type>
 		</dependency>
 		<dependency>
+			<groupId>jfree</groupId>
+			<artifactId>jfreechart</artifactId>
+			<version>1.0.13</version>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
 			<version>3.1</version>
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
index cfe4dfd..63758f2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
@@ -79,6 +79,11 @@
         @Option(name = "-followsGraphBuild", usage = "whether or not the given input is output from a previous graph-build", required = false)
         private boolean followsGraphBuild = false;
         
+        @Option(name = "-clusterWaitTime", usage = "the amount of time (in ms) to wait between starting/stopping CC/NC", required = false)
+        private int clusterWaitTime = -1;
+        
+        @Option(name = "-drawStatistics", usage = "Plot coverage statistics after graphbuilding stages", required = false)
+        private boolean drawStatistics = false;
 
         // Graph cleaning
         @Option(name = "-bridgeRemove_maxLength", usage = "Nodes with length <= bridgeRemoveLength that bridge separate paths are removed from the graph", required = false)
@@ -115,9 +120,6 @@
         @Option(name = "-profile", usage = "Whether or not to do runtime profifling", required = false)
         private boolean profile = false;
         
-        @Option(name = "-coresPerMachine", usage="the number of cores available in each machine", required=false)
-        private int coresPerMachine = -1;
-        
         @Option(name = "-runLocal", usage = "Run a local instance using the Hadoop MiniCluster. NOTE: overrides settings for -ip and -port and those in conf/*.properties", required=false)
         private boolean runLocal = false;
         
@@ -143,7 +145,6 @@
         TIP_REMOVE,
         SCAFFOLD,
         SPLIT_REPEAT,
-        STATS,
         DUMP_FASTA;
         
         /**
@@ -176,6 +177,8 @@
     public static final String LOCAL_OUTPUT_DIR = "genomix.final.local.output.dir";
     public static final String SAVE_INTERMEDIATE_RESULTS = "genomix.save.intermediate.results";
     public static final String FOLLOWS_GRAPH_BUILD = "genomix.follows.graph.build";
+    public static final String CLUSTER_WAIT_TIME = "genomix.cluster.wait.time";
+    public static final String DRAW_STATISTICS = "genomix.draw.statistics";
     
     // Graph cleaning
     public static final String BRIDGE_REMOVE_MAX_LENGTH = "genomix.bridgeRemove.maxLength";
@@ -194,31 +197,11 @@
     public static final String RUN_LOCAL = "genomix.runLocal";
 
     // TODO should these also be command line options?
-    public static final String CORES_PER_MACHINE = "genomix.driver.duplicate.num";
     public static final String FRAME_SIZE = "genomix.framesize";
     public static final String FRAME_LIMIT = "genomix.framelimit";
-    public static final String TABLE_SIZE = "genomix.tablesize";
     public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
     public static final String OUTPUT_FORMAT = "genomix.graph.output";
 
-    /** Configurations used by hybrid groupby function in graph build phrase */
-    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
-    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
-    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
-    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
-    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
-
-    public static final int DEFAULT_FRAME_SIZE = 65536;
-    public static final int DEFAULT_FRAME_LIMIT = 65536;
-    public static final int DEFAULT_TABLE_SIZE = 10485767;
-    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
-    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
-    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
-    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
-    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
-
-    public static final String GROUPBY_TYPE_HYBRID = "hybrid";
-    public static final String GROUPBY_TYPE_EXTERNAL = "external";
     public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
     
     public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
@@ -227,16 +210,21 @@
     public static final String OUTPUT_FORMAT_BINARY = "genomix.outputformat.binary";
     public static final String OUTPUT_FORMAT_TEXT = "genomix.outputformat.text";
     public static final String HDFS_WORK_PATH = "genomix.hdfs.work.path";
+    public static final String HYRACKS_IO_DIRS = "genomix.hyracks.IO_DIRS";
+    public static final String HYRACKS_SLAVES = "genomix.hyracks.slaves.list";
+    
     private static final Patterns[] DEFAULT_PIPELINE_ORDER = {
                     Patterns.BUILD, Patterns.MERGE, 
                     Patterns.LOW_COVERAGE, Patterns.MERGE,
                     Patterns.TIP_REMOVE, Patterns.MERGE,
-//                    Patterns.BUBBLE, Patterns.MERGE,
-//                    Patterns.SPLIT_REPEAT, Patterns.MERGE,
-//                    Patterns.SCAFFOLD, Patterns.MERGE
+                    Patterns.BUBBLE, Patterns.MERGE,
+                    Patterns.SPLIT_REPEAT, Patterns.MERGE,
+                    Patterns.SCAFFOLD, Patterns.MERGE
             };
     
     
+    
+    
     private String[] extraArguments = {};
     
     private static Map<String, Long> tickTimes = new HashMap<String, Long>(); 
@@ -355,10 +343,13 @@
             set(HDFS_WORK_PATH, "genomix_out");  // should be in the user's home directory? 
         
         // hyracks-specific
-        if (getInt(CORES_PER_MACHINE, -1) == -1)
-            setInt(CORES_PER_MACHINE, 4);
-        if (getInt(FRAME_SIZE, -1) == -1)
-            setInt(FRAME_SIZE, DEFAULT_FRAME_SIZE);
+        if (getInt(CLUSTER_WAIT_TIME, -1) == -1)
+            setInt(CLUSTER_WAIT_TIME, 6000);
+        
+        if (getBoolean(DRAW_STATISTICS, false))
+            setBoolean(DRAW_STATISTICS, true);
+        else
+            setBoolean(DRAW_STATISTICS, false);
         
 //        if (getBoolean(RUN_LOCAL, false)) {
 //            // override any other settings for HOST and PORT
@@ -389,12 +380,9 @@
             set(HDFS_WORK_PATH, opts.hdfsWorkPath);
         setBoolean(SAVE_INTERMEDIATE_RESULTS, opts.saveIntermediateResults);
         setBoolean(FOLLOWS_GRAPH_BUILD, opts.followsGraphBuild);
+        setInt(CLUSTER_WAIT_TIME, opts.clusterWaitTime);
+        setBoolean(DRAW_STATISTICS, opts.drawStatistics);
             
-
-//        if (opts.runLocal && (opts.ipAddress != null || opts.port != -1))
-//            throw new IllegalArgumentException("Option -runLocal cannot be set at the same time as -port or -ip! (-runLocal starts a cluster; -ip and -port specify an existing cluster)");
-        if (opts.runLocal)
-            throw new IllegalArgumentException("runLocal is currently unsupported!");
         setBoolean(RUN_LOCAL, opts.runLocal);
         
         // Hyracks/Pregelix Setup
@@ -402,7 +390,6 @@
             set(IP_ADDRESS, opts.ipAddress);
         setInt(PORT, opts.port);
         setBoolean(PROFILE, opts.profile);
-        setInt(CORES_PER_MACHINE, opts.coresPerMachine);
         
         // Graph cleaning
         setInt(BRIDGE_REMOVE_MAX_LENGTH, opts.bridgeRemove_maxLength);
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java
new file mode 100644
index 0000000..20bfceb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.minicluster;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.jfree.chart.ChartFactory;
+import org.jfree.chart.ChartUtilities;
+import org.jfree.chart.JFreeChart;
+import org.jfree.chart.plot.PlotOrientation;
+import org.jfree.data.xy.XYDataset;
+import org.jfree.data.xy.XYSeries;
+import org.jfree.data.xy.XYSeriesCollection;
+
+import edu.uci.ics.genomix.config.GenomixJobConf;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DriverUtils {
+
+    public static final Log LOG = LogFactory.getLog(DriverUtils.class);
+
+    /*
+     * Get the IP address of the master node using the bin/getip.sh script
+     */
+    public static String getIP(String hostName) throws IOException, InterruptedException {
+        String getIPCmd = "ssh -n " + hostName + " \"" + System.getProperty("app.home", ".") + File.separator + "bin"
+                + File.separator + "getip.sh\"";
+        Process p = Runtime.getRuntime().exec(getIPCmd);
+        p.waitFor(); // wait for ssh 
+        String stdout = IOUtils.toString(p.getInputStream()).trim();
+        if (p.exitValue() != 0)
+            throw new RuntimeException("Failed to get the ip address of the master node! Script returned exit code: "
+                    + p.exitValue() + "\nstdout: " + stdout + "\nstderr: " + IOUtils.toString(p.getErrorStream()));
+        return stdout;
+    }
+
+    /**
+     * set the CC's IP address and port from the cluster.properties and `getip.sh` script
+     */
+    public static void updateCCProperties(GenomixJobConf conf) throws FileNotFoundException, IOException, InterruptedException {
+        Properties CCProperties = new Properties();
+        CCProperties.load(new FileInputStream(System.getProperty("app.home", ".") + File.separator + "conf"
+                + File.separator + "cluster.properties"));
+        if (conf.get(GenomixJobConf.IP_ADDRESS) == null)
+            conf.set(GenomixJobConf.IP_ADDRESS, getIP("localhost"));
+        if (Integer.parseInt(conf.get(GenomixJobConf.PORT)) == -1) {
+            conf.set(GenomixJobConf.PORT, CCProperties.getProperty("CC_CLIENTPORT"));
+        }
+        if (conf.get(GenomixJobConf.FRAME_SIZE) == null)
+            conf.set(GenomixJobConf.FRAME_SIZE, CCProperties.getProperty("FRAME_SIZE"));
+        if (conf.get(GenomixJobConf.FRAME_LIMIT) == null)
+            conf.set(GenomixJobConf.FRAME_LIMIT, CCProperties.getProperty("FRAME_LIMIT"));
+        if (conf.get(GenomixJobConf.HYRACKS_IO_DIRS) == null)
+            conf.set(GenomixJobConf.HYRACKS_IO_DIRS, CCProperties.getProperty("IO_DIRS"));
+        if (conf.get(GenomixJobConf.HYRACKS_SLAVES) == null) {
+            String slaves = FileUtils.readFileToString(new File(System.getProperty("app.home", ".") + File.separator + "conf"
+                    + File.separator + "slaves"));
+            conf.set(GenomixJobConf.HYRACKS_SLAVES, slaves);
+        }
+    }
+    
+    public static void drawStatistics(JobConf conf, String inputStats, String outputChart) throws IOException {
+        LOG.info("Getting coverage statistics...");
+        GenomixJobConf.tick("drawStatistics");
+        FileSystem dfs = FileSystem.get(conf);
+
+        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
+        SequenceFile.Reader reader = null;
+        VKmerBytesWritable key = null;
+        NodeWritable value = null;
+        TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
+        FileStatus[] files = dfs.globStatus(new Path(inputStats + File.separator + "*"));
+        for (FileStatus f : files) {
+            if (f.getLen() != 0) {
+                try {
+                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    while (reader.next(key, value)) {
+                        if (key == null || value == null)
+                            break;
+                        Integer cov = java.lang.Math.round(value.getAverageCoverage());
+                        Long count = coverageCounts.get(cov);
+                        if (count == null)
+                            coverageCounts.put(cov, new Long(1));
+                        else
+                            coverageCounts.put(cov, count + 1);
+                    }
+                } catch (Exception e) {
+                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
+                } finally {
+                    if (reader != null)
+                        reader.close();
+                }
+            }
+        }
+
+        XYSeries series = new XYSeries("Kmer Coverage");
+        for (Entry<Integer, Long> pair : coverageCounts.entrySet()) {
+            series.add(pair.getKey().floatValue(), pair.getValue().longValue());
+        }
+        XYDataset xyDataset = new XYSeriesCollection(series);
+        JFreeChart chart = ChartFactory.createXYLineChart("Coverage per kmer in " + new File(inputStats).getName(),
+                "Coverage", "Count", xyDataset, PlotOrientation.VERTICAL, true, true, false);
+
+        // Write the data to the output stream:
+        FileOutputStream chartOut = new FileOutputStream(new File(outputChart));
+        ChartUtilities.writeChartAsPNG(chartOut, chart, 800, 600);
+        chartOut.flush();
+        chartOut.close();
+        LOG.info("Coverage took " + GenomixJobConf.tock("drawStatistics") + "ms");
+    }
+
+    public static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild)
+            throws IOException {
+        LOG.info("Dumping graph to fasta...");
+        GenomixJobConf.tick("dumpGraph");
+        FileSystem dfs = FileSystem.get(conf);
+
+        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
+        SequenceFile.Reader reader = null;
+        VKmerBytesWritable key = null;
+        NodeWritable value = null;
+        BufferedWriter bw = null;
+        FileStatus[] files = dfs.globStatus(new Path(inputGraph + File.separator + "*"));
+        for (FileStatus f : files) {
+            if (f.getLen() != 0) {
+                try {
+                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    if (bw == null)
+                        bw = new BufferedWriter(new FileWriter(outputFasta));
+                    while (reader.next(key, value)) {
+                        if (key == null || value == null)
+                            break;
+                        bw.write(">node_" + key.toString() + "\n");
+                        bw.write(followingBuild ? key.toString() : value.getInternalKmer().toString());
+                        bw.newLine();
+                    }
+                } catch (Exception e) {
+                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
+                } finally {
+                    if (reader != null)
+                        reader.close();
+                }
+            }
+        }
+        if (bw != null)
+            bw.close();
+        LOG.info("Dump graph to fasta took " + GenomixJobConf.tock("dumpGraph") + "ms");
+    }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java
new file mode 100644
index 0000000..a3e1e0a
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java
@@ -0,0 +1,393 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.minicluster;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.config.GenomixJobConf;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
+
+/**
+ * 
+ *
+ */
+public class GenomixClusterManager {
+
+    public enum ClusterType {
+        HYRACKS,
+        PREGELIX,
+        HADOOP
+    }
+
+    private static final Log LOG = LogFactory.getLog(GenomixClusterManager.class);
+    public static final String LOCAL_HOSTNAME = "localhost";
+    public static final String LOCAL_IP = "127.0.0.1";
+    public static final int LOCAL_HYRACKS_CLIENT_PORT = 3099;
+    public static final int LOCAL_HYRACKS_CC_PORT = 1099;
+    public static final int LOCAL_PREGELIX_CLIENT_PORT = 3097;
+    public static final int LOCAL_PREGELIX_CC_PORT = 1097;
+
+    private ClusterControllerService localHyracksCC;
+    private NodeControllerService localHyracksNC;
+    private ClusterControllerService localPregelixCC;
+    private NodeControllerService localPregelixNC;
+    private MiniDFSCluster localDFSCluster;
+    private MiniMRCluster localMRCluster;
+
+    private final boolean runLocal;
+    private final GenomixJobConf conf;
+    private boolean jarsCopiedToHadoop = false;
+
+    private HashMap<ClusterType, Thread> shutdownHooks = new HashMap<ClusterType, Thread>();
+
+    public GenomixClusterManager(boolean runLocal, GenomixJobConf conf) {
+        this.runLocal = runLocal;
+        this.conf = conf;
+    }
+
+    /**
+     * Start a cluster of the given type. If runLocal is specified, we will create an in-memory version of the cluster.
+     */
+    public void startCluster(ClusterType clusterType) throws Exception {
+        addClusterShutdownHook(clusterType);
+        switch (clusterType) {
+            case HYRACKS:
+            case PREGELIX:
+                if (runLocal) {
+                    startLocalCC(clusterType);
+                    startLocalNC(clusterType);
+                } else {
+                    int sleepms = Integer.parseInt(conf.get(GenomixJobConf.CLUSTER_WAIT_TIME));
+                    startCC(sleepms);
+                    startNCs(clusterType, sleepms);
+                }
+                break;
+            case HADOOP:
+                if (runLocal)
+                    startLocalMRCluster();
+                else
+                    deployJarsToHadoop();
+                break;
+        }
+    }
+
+    public void stopCluster(ClusterType clusterType) throws Exception {
+        switch (clusterType) {
+            case HYRACKS:
+                if (runLocal) {
+                    if (localHyracksCC != null) {
+                        localHyracksCC.stop();
+                        localHyracksCC = null;
+                    }
+                    if (localHyracksNC != null) {
+                        localHyracksNC.stop();
+                        localHyracksNC = null;
+                    }
+                } else {
+                    shutdownCC();
+                    shutdownNCs();
+                }
+                break;
+            case PREGELIX:
+                if (runLocal) {
+                    if (localPregelixCC != null) {
+                        localPregelixCC.stop();
+                        localPregelixCC = null;
+                    }
+                    if (localPregelixNC != null) {
+                        localPregelixNC.stop();
+                        localPregelixNC = null;
+
+                    }
+                } else {
+                    shutdownCC();
+                    shutdownNCs();
+                }
+                break;
+            case HADOOP:
+                if (runLocal) {
+                    if (localMRCluster != null) {
+                        localMRCluster.shutdown();
+                        localMRCluster = null;
+                    }
+                    if (localDFSCluster != null) {
+                        localDFSCluster.shutdown();
+                        localDFSCluster = null;
+                    }
+                }
+                break;
+        }
+        removeClusterShutdownHook(clusterType);
+    }
+
+    private void startLocalCC(ClusterType clusterType) throws Exception {
+        LOG.info("Starting local CC...");
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = LOCAL_HOSTNAME;
+        ccConfig.clusterNetIpAddress = LOCAL_HOSTNAME;
+        ccConfig.defaultMaxJobAttempts = 0;
+        ccConfig.jobHistorySize = 1;
+        ccConfig.profileDumpPeriod = -1;
+
+        if (clusterType == ClusterType.HYRACKS) {
+            ccConfig.clusterNetPort = LOCAL_HYRACKS_CC_PORT;
+            ccConfig.clientNetPort = LOCAL_HYRACKS_CLIENT_PORT;
+            localHyracksCC = new ClusterControllerService(ccConfig);
+            localHyracksCC.start();
+        } else if (clusterType == ClusterType.PREGELIX) {
+            ccConfig.clusterNetPort = LOCAL_PREGELIX_CC_PORT;
+            ccConfig.clientNetPort = LOCAL_PREGELIX_CLIENT_PORT;
+            localPregelixCC = new ClusterControllerService(ccConfig);
+            localPregelixCC.start();
+        } else {
+            throw new IllegalArgumentException("Invalid CC type: " + clusterType);
+        }
+    }
+
+    private void startLocalNC(ClusterType clusterType) throws Exception {
+        LOG.info("Starting local NC...");
+        //        ClusterConfig.setClusterPropertiesPath(System.getProperty("app.home") + "/conf/cluster.properties");
+        //        ClusterConfig.setStorePath(...);
+        NCConfig ncConfig = new NCConfig();
+        ncConfig.ccHost = LOCAL_HOSTNAME;
+        ncConfig.clusterNetIPAddress = LOCAL_HOSTNAME;
+        ncConfig.dataIPAddress = LOCAL_IP;
+        ncConfig.datasetIPAddress = LOCAL_IP;
+        ncConfig.nodeId = "nc-" + clusterType;
+        ncConfig.ioDevices = "tmp" + File.separator + "t3" + File.separator + clusterType;
+
+        if (clusterType == ClusterType.HYRACKS) {
+            ncConfig.ccPort = LOCAL_HYRACKS_CC_PORT;
+            localHyracksNC = new NodeControllerService(ncConfig);
+            localHyracksNC.start();
+        } else if (clusterType == ClusterType.PREGELIX) {
+            ncConfig.ccPort = LOCAL_PREGELIX_CC_PORT;
+            ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
+            localPregelixNC = new NodeControllerService(ncConfig);
+            localPregelixNC.start();
+        } else {
+            throw new IllegalArgumentException("Invalid NC type: " + clusterType);
+        }
+    }
+
+    private void startLocalMRCluster() throws IOException {
+        LOG.info("Starting local DFS and MR cluster...");
+        localDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+        localMRCluster = new MiniMRCluster(1, localDFSCluster.getFileSystem().getUri().toString(), 1);
+    }
+
+    /**
+     * Walk the current CLASSPATH to get all jar's in use and copy them up to all HDFS nodes
+     * 
+     * @throws IOException
+     */
+    private void deployJarsToHadoop() throws IOException {
+        if (!jarsCopiedToHadoop) {
+            LOG.info("Deploying jars in my classpath to HDFS Distributed Cache...");
+            FileSystem dfs = FileSystem.get(conf);
+            String[] classPath = { System.getenv().get("CLASSPATH"), System.getProperty("java.class.path") };
+            for (String cp : classPath) {
+                if (cp == null)
+                    continue;
+                for (String item : cp.split(":")) {
+                    //                    LOG.info("Checking " + item);
+                    if (item.endsWith(".jar")) {
+                        //                        LOG.info("Deploying " + item);
+                        Path localJar = new Path(item);
+                        Path jarDestDir = new Path(conf.get(GenomixJobConf.HDFS_WORK_PATH) + "/jar-dependencies");
+                        // dist cache requires absolute paths. we have to use the working directory if HDFS_WORK_PATH is relative
+                        if (!jarDestDir.isAbsolute()) {
+                            // working dir is the correct base, but we must use the path version (not a URI). Get URI and strip out leading identifiers
+                            String hostNameRE = "([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*";
+                            String[] workDirs = dfs.getWorkingDirectory().toString()
+                                    .split("(hdfs://" + hostNameRE + ":\\d+|file:)", 2);
+                            if (workDirs.length <= 1) {
+                                LOG.info("Weird.... didn't find a URI header matching hdfs://host:port or file:  Just using the original instead.");
+                                jarDestDir = new Path(dfs.getWorkingDirectory() + File.separator + jarDestDir);
+                            } else {
+                                jarDestDir = new Path(workDirs[1] + File.separator + jarDestDir);
+                            }
+                        }
+                        dfs.mkdirs(jarDestDir);
+                        Path destJar = new Path(jarDestDir + File.separator + localJar.getName());
+                        dfs.copyFromLocalFile(localJar, destJar);
+                        //                        LOG.info("Jar in distributed cache: " + destJar);
+                        DistributedCache.addFileToClassPath(destJar, conf);
+                    }
+                }
+            }
+        }
+    }
+
+    private static void startNCs(ClusterType type, int sleepms) throws IOException, InterruptedException {
+        LOG.info("Starting NC's");
+        String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+                + "startAllNCs.sh " + type;
+        Process p = Runtime.getRuntime().exec(startNCCmd);
+        p.waitFor(); // wait for ssh 
+        Thread.sleep(sleepms); // wait for NC -> CC registration
+        if (p.exitValue() != 0)
+            throw new RuntimeException("Failed to start the" + type + " NC's! Script returned exit code: "
+                    + p.exitValue() + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
+                    + IOUtils.toString(p.getErrorStream()));
+    }
+
+    private static void startCC(int sleepms) throws IOException, InterruptedException {
+        LOG.info("Starting CC");
+        String startCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+                + "startcc.sh";
+        Process p = Runtime.getRuntime().exec(startCCCmd);
+        p.waitFor(); // wait for cmd execution
+        Thread.sleep(sleepms); // wait for CC registration
+        if (p.exitValue() != 0)
+            throw new RuntimeException("Failed to start the genomix CC! Script returned exit code: " + p.exitValue()
+                    + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
+                    + IOUtils.toString(p.getErrorStream()));
+    }
+
+    private static void shutdownCC() throws IOException, InterruptedException {
+        LOG.info("Shutting down any previous CC");
+        String stopCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopcc.sh";
+        Process p = Runtime.getRuntime().exec(stopCCCmd);
+        p.waitFor(); // wait for cmd execution
+    }
+
+    private static void shutdownNCs() throws IOException, InterruptedException {
+        LOG.info("Shutting down any previous NC's");
+        String stopNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+                + "stopAllNCs.sh";
+        Process p = Runtime.getRuntime().exec(stopNCCmd);
+        p.waitFor(); // wait for ssh 
+    }
+
+    private void addClusterShutdownHook(final ClusterType clusterType) {
+        if (shutdownHooks.containsKey(clusterType))
+            throw new IllegalArgumentException("Already specified a hook for shutting down a " + clusterType
+                    + " cluster! (Try removing the existing hook first?)");
+        Thread hook = new Thread() {
+            @Override
+            public void run() {
+                LOG.info("Shutting down the cluster...");
+                try {
+                    stopCluster(clusterType);
+                } catch (Exception e) {
+                    System.err.println("Error while shutting the cluster down:");
+                    e.printStackTrace();
+                }
+            }
+        };
+        shutdownHooks.put(clusterType, hook);
+        Runtime.getRuntime().addShutdownHook(hook);
+    }
+
+    private void removeClusterShutdownHook(final ClusterType clusterType) {
+        if (!shutdownHooks.containsKey(clusterType))
+            //            throw new IllegalArgumentException("There is no shutdown hook for " + clusterType + "!");
+            return; // ignore-- we are cleaning up after a previous run
+        try {
+            Runtime.getRuntime().removeShutdownHook(shutdownHooks.get(clusterType));
+        } catch (IllegalStateException e) {
+            // ignore: we must already be shutting down
+        }
+    }
+
+    public static void copyLocalToHDFS(JobConf conf, String localDir, String destDir) throws IOException {
+        LOG.info("Copying local directory " + localDir + " to HDFS: " + destDir);
+        GenomixJobConf.tick("copyLocalToHDFS");
+        FileSystem dfs = FileSystem.get(conf);
+        Path dest = new Path(destDir);
+        dfs.delete(dest, true);
+        dfs.mkdirs(dest);
+
+        File srcBase = new File(localDir);
+        if (srcBase.isDirectory())
+            for (File f : srcBase.listFiles())
+                dfs.copyFromLocalFile(new Path(f.toString()), dest);
+        else
+            dfs.copyFromLocalFile(new Path(localDir), dest);
+
+        LOG.info("Copy took " + GenomixJobConf.tock("copyLocalToHDFS") + "ms");
+    }
+
+    public static void copyBinToLocal(JobConf conf, String hdfsSrcDir, String localDestDir) throws IOException {
+        LOG.info("Copying HDFS directory " + hdfsSrcDir + " to local: " + localDestDir);
+        GenomixJobConf.tick("copyBinToLocal");
+        FileSystem dfs = FileSystem.get(conf);
+        FileUtils.deleteQuietly(new File(localDestDir));
+
+        // save original binary to output/bin
+        dfs.copyToLocalFile(new Path(hdfsSrcDir), new Path(localDestDir + File.separator + "bin"));
+
+        // convert hdfs sequence files to text as output/text
+        BufferedWriter bw = null;
+        SequenceFile.Reader reader = null;
+        Writable key = null;
+        Writable value = null;
+        FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + File.separator + "*"));
+        for (FileStatus f : files) {
+            if (f.getLen() != 0 && !f.isDir()) {
+                try {
+                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                    key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    if (bw == null)
+                        bw = new BufferedWriter(new FileWriter(localDestDir + File.separator + "data"));
+                    while (reader.next(key, value)) {
+                        if (key == null || value == null)
+                            break;
+                        bw.write(key.toString() + "\t" + value.toString());
+                        bw.newLine();
+                    }
+                } catch (Exception e) {
+                    System.out.println("Encountered an error copying " + f + " to local:\n" + e);
+                } finally {
+                    if (reader != null)
+                        reader.close();
+                }
+
+            }
+        }
+        if (bw != null)
+            bw.close();
+        LOG.info("Copy took " + GenomixJobConf.tock("copyBinToLocal") + "ms");
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java
deleted file mode 100644
index 28d6572..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package edu.uci.ics.genomix.minicluster;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.wicket.util.file.File;
-
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
-import edu.uci.ics.genomix.config.GenomixJobConf;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-
-public class GenomixMiniCluster {
-    
-    public static final String NC1_ID = "nc1";
-    public static final int TEST_HYRACKS_CC_PORT = 1099;
-    public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
-    public static final String CC_HOST = "localhost";
-
-    private static ClusterControllerService cc;
-    private static NodeControllerService nc1;
-    private static MiniDFSCluster dfsCluster;
-    private static Path clusterProperties;
-    private static Path clusterStores;
-    private static Path clusterWorkDir;
-    private static Path clusterStoresDir;
-    
-    public static void init(GenomixJobConf conf) throws Exception {
-        makeLocalClusterConfig();
-        ClusterConfig.setClusterPropertiesPath(clusterProperties.toAbsolutePath().toString());
-        ClusterConfig.setStorePath(clusterStores.toAbsolutePath().toString());
-//        dfsCluster = new MiniDFSCluster(conf, Integer.parseInt(conf.get(GenomixJobConf.CORES_PER_MACHINE)), true, null);
-        
-        // cluster controller
-        CCConfig ccConfig = new CCConfig();
-        ccConfig.clientNetIpAddress = CC_HOST;
-        ccConfig.clusterNetIpAddress = CC_HOST;
-        ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
-        ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
-        ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.jobHistorySize = 1;
-        ccConfig.profileDumpPeriod = -1;
-        cc = new ClusterControllerService(ccConfig);
-        cc.start();
-
-        // one node controller
-        NCConfig ncConfig1 = new NCConfig();
-        ncConfig1.ccHost = "localhost";
-        ncConfig1.clusterNetIPAddress = "localhost";
-        ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
-        ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.datasetIPAddress = "127.0.0.1";
-        ncConfig1.nodeId = NC1_ID;
-        ncConfig1.ioDevices = clusterWorkDir.toString() + File.separator + "tmp" + File.separator + "t3";
-        ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
-        nc1 = new NodeControllerService(ncConfig1);
-        nc1.start();
-    }
-
-    public static void deinit() throws Exception {
-        nc1.stop();
-        cc.stop();
-//        dfsCluster.shutdown();
-        FileUtils.deleteQuietly(new File("build"));
-        FileUtils.deleteQuietly(clusterProperties.toFile());
-        FileUtils.deleteQuietly(clusterStores.toFile());
-        FileUtils.deleteQuietly(clusterWorkDir.toFile());
-        FileUtils.deleteQuietly(clusterStoresDir.toFile());
-    }
-
-    /**
-     * create the necessary .properties and directories for a minicluster instance 
-     */
-    private static void makeLocalClusterConfig() throws IOException {
-        clusterProperties = Files.createTempFile(FileSystems.getDefault().getPath("."), "tmp.cluster", ".properties");
-        clusterWorkDir = Files.createTempDirectory(FileSystems.getDefault().getPath("."), "tmp.clusterWorkDir");
-        PrintWriter writer = new PrintWriter(clusterProperties.toString(), "US-ASCII");
-        writer.println("CC_CLIENTPORT=" + TEST_HYRACKS_CC_CLIENT_PORT);
-        writer.println("CC_CLUSTERPORT=" + TEST_HYRACKS_CC_PORT);
-        writer.println(String.format("WORKPATH=%s", clusterWorkDir.toAbsolutePath().toString()));
-        writer.println("CCTMP_DIR=${WORKPATH}/tmp/t1");
-        writer.println("NCTMP_DIR=${WORKPATH}/tmp/t2");
-        writer.println("CCLOGS_DIR=$CCTMP_DIR/logs");
-        writer.println("NCLOGS_DIR=$NCTMP_DIR/logs");
-        writer.println("IO_DIRS=${WORKPATH}/tmp/t3");
-        writer.println("JAVA_HOME=$JAVA_HOME");
-        writer.println("CLASSPATH=\"${HADOOP_HOME}:${CLASSPATH}:.\"");
-        writer.println("FRAME_SIZE=65536");
-        writer.println("CCJAVA_OPTS=\"-Xmx1g -Djava.util.logging.config.file=logging.properties\"");
-        writer.println("NCJAVA_OPTS=\"-Xmx1g -Djava.util.logging.config.file=logging.properties\"");
-        writer.close();
-
-        clusterStoresDir = Files.createTempDirectory(FileSystems.getDefault().getPath("."), "tmp.clusterStores");
-        clusterStores = Files.createTempFile(FileSystems.getDefault().getPath("."), "tmp.stores", ".properties");
-        writer = new PrintWriter(clusterStores.toString(), "US-ASCII");
-        writer.println(String.format("store=%s", clusterStoresDir.toString()));
-        writer.close();
-    }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
index 758051d..968c492 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
@@ -331,7 +331,8 @@
         for (EdgeWritable e : edges) {
             VKmerBytesWritable key = e.getKey();
             if (unionEdges.containsKey(key)) {
-                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+//                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+                unionEdges.get(key).unionUpdate(e.getReadIDs());
             }
             else {
                 unionEdges.put(key, new PositionListWritable(e.getReadIDs())); // make a new copy of their list
@@ -340,7 +341,8 @@
         for (EdgeWritable e : other.edges) {
             VKmerBytesWritable key = e.getKey();
             if (unionEdges.containsKey(key)) {
-                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+//                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+                unionEdges.get(key).unionUpdate(e.getReadIDs());
             }
             else {
                 unionEdges.put(key, new PositionListWritable(e.getReadIDs())); // make a new copy of their list
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
deleted file mode 100644
index f3a4d6a..0000000
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
+++ /dev/null
@@ -1,316 +0,0 @@
-package edu.uci.ics.genomix.driver;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.jfree.chart.ChartFactory;
-import org.jfree.chart.ChartUtilities;
-import org.jfree.chart.JFreeChart;
-import org.jfree.chart.plot.PlotOrientation;
-import org.jfree.data.xy.XYDataset;
-import org.jfree.data.xy.XYSeries;
-import org.jfree.data.xy.XYSeriesCollection;
-
-import edu.uci.ics.genomix.config.GenomixJobConf;
-import edu.uci.ics.genomix.minicluster.GenomixMiniCluster;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class DriverUtils {
-
-    enum NCTypes {
-        HYRACKS,
-        PREGELIX
-    }
-
-    private static final Log LOG = LogFactory.getLog(DriverUtils.class);
-
-    /*
-     * Get the IP address of the master node using the bin/getip.sh script
-     */
-    static String getIP(String hostName) throws IOException, InterruptedException {
-        String getIPCmd = "ssh -n " + hostName + " \"" + System.getProperty("app.home", ".") + File.separator + "bin"
-                + File.separator + "getip.sh\"";
-        Process p = Runtime.getRuntime().exec(getIPCmd);
-        p.waitFor(); // wait for ssh 
-        String stdout = IOUtils.toString(p.getInputStream()).trim();
-        if (p.exitValue() != 0)
-            throw new RuntimeException("Failed to get the ip address of the master node! Script returned exit code: "
-                    + p.exitValue() + "\nstdout: " + stdout + "\nstderr: " + IOUtils.toString(p.getErrorStream()));
-        return stdout;
-    }
-
-    /**
-     * set the CC's IP address and port from the cluster.properties and `getip.sh` script
-     */
-    static void updateCCProperties(GenomixJobConf conf) throws FileNotFoundException, IOException, InterruptedException {
-        Properties CCProperties = new Properties();
-        CCProperties.load(new FileInputStream(System.getProperty("app.home", ".") + File.separator + "conf"
-                + File.separator + "cluster.properties"));
-        if (conf.get(GenomixJobConf.IP_ADDRESS) == null)
-            conf.set(GenomixJobConf.IP_ADDRESS, getIP("localhost"));
-        if (Integer.parseInt(conf.get(GenomixJobConf.PORT)) == -1) {
-            conf.set(GenomixJobConf.PORT, CCProperties.getProperty("CC_CLIENTPORT"));
-        }
-        if (conf.get(GenomixJobConf.FRAME_SIZE) == null)
-            conf.set(GenomixJobConf.FRAME_SIZE,
-                    CCProperties.getProperty("FRAME_SIZE", String.valueOf(GenomixJobConf.DEFAULT_FRAME_SIZE)));
-    }
-
-    static void startNCs(NCTypes type) throws IOException {
-        LOG.info("Starting NC's");
-        String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
-                + "startAllNCs.sh " + type;
-        Process p = Runtime.getRuntime().exec(startNCCmd);
-        try {
-            p.waitFor(); // wait for ssh 
-            Thread.sleep(5000); // wait for NC -> CC registration
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        if (p.exitValue() != 0)
-            throw new RuntimeException("Failed to start the" + type + " NC's! Script returned exit code: "
-                    + p.exitValue() + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
-                    + IOUtils.toString(p.getErrorStream()));
-    }
-
-    static void shutdownNCs() throws IOException {
-        LOG.info("Shutting down any previous NC's");
-        String stopNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
-                + "stopAllNCs.sh";
-        Process p = Runtime.getRuntime().exec(stopNCCmd);
-        try {
-            p.waitFor(); // wait for ssh 
-            //            Thread.sleep(3000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    static void startCC() throws IOException {
-        LOG.info("Starting CC");
-        String startCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
-                + "startcc.sh";
-        Process p = Runtime.getRuntime().exec(startCCCmd);
-        try {
-            p.waitFor(); // wait for cmd execution
-            Thread.sleep(6000); // wait for CC registration
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        if (p.exitValue() != 0)
-            throw new RuntimeException("Failed to start the genomix CC! Script returned exit code: " + p.exitValue()
-                    + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
-                    + IOUtils.toString(p.getErrorStream()));
-    }
-
-    static void shutdownCC() throws IOException {
-        LOG.info("Shutting down CC");
-        String stopCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopcc.sh";
-        Process p = Runtime.getRuntime().exec(stopCCCmd);
-        try {
-            p.waitFor(); // wait for cmd execution
-            //            Thread.sleep(2000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        //        if (p.exitValue() != 0)
-        //            throw new RuntimeException("Failed to stop the genomix CC! Script returned exit code: " + p.exitValue()
-        //                    + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
-        //                    + IOUtils.toString(p.getErrorStream()));
-    }
-
-    static void addClusterShutdownHook(final boolean runLocal) {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                LOG.info("Shutting down the cluster");
-                try {
-                    if (runLocal)
-                        GenomixMiniCluster.deinit();
-                    else {
-                        DriverUtils.shutdownNCs();
-                        DriverUtils.shutdownCC();
-                    }
-                } catch (Exception e) {
-                    System.err.println("Error while shutting the cluster down:");
-                    e.printStackTrace();
-                }
-            }
-        });
-    }
-
-    public static void copyLocalToHDFS(JobConf conf, String localDir, String destDir) throws IOException {
-        LOG.info("Copying local directory " + localDir + " to HDFS: " + destDir);
-        GenomixJobConf.tick("copyLocalToHDFS");
-        FileSystem dfs = FileSystem.get(conf);
-        Path dest = new Path(destDir);
-        dfs.delete(dest, true);
-        dfs.mkdirs(dest);
-
-        File srcBase = new File(localDir);
-        if (srcBase.isDirectory())
-            for (File f : srcBase.listFiles())
-                dfs.copyFromLocalFile(new Path(f.toString()), dest);
-        else
-            dfs.copyFromLocalFile(new Path(localDir), dest);
-
-        LOG.info("Copy took " + GenomixJobConf.tock("copyLocalToHDFS") + "ms");
-    }
-
-    public static void copyBinToLocal(JobConf conf, String hdfsSrcDir, String localDestDir) throws IOException {
-        LOG.info("Copying HDFS directory " + hdfsSrcDir + " to local: " + localDestDir);
-        GenomixJobConf.tick("copyBinToLocal");
-        FileSystem dfs = FileSystem.get(conf);
-        FileUtils.deleteQuietly(new File(localDestDir));
-
-        // save original binary to output/bin
-        dfs.copyToLocalFile(new Path(hdfsSrcDir), new Path(localDestDir + File.separator + "bin"));
-
-        // convert hdfs sequence files to text as output/text
-        BufferedWriter bw = null;
-        SequenceFile.Reader reader = null;
-        Writable key = null;
-        Writable value = null;
-        FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + File.separator + "*"));
-        for (FileStatus f : files) {
-            if (f.getLen() != 0 && !f.isDir()) {
-                try {
-                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
-                    key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    if (bw == null)
-                        bw = new BufferedWriter(new FileWriter(localDestDir + File.separator + "data"));
-                    while (reader.next(key, value)) {
-                        if (key == null || value == null)
-                            break;
-                        bw.write(key.toString() + "\t" + value.toString());
-                        bw.newLine();
-                    }
-                } catch (Exception e) {
-                    System.out.println("Encountered an error copying " + f + " to local:\n" + e);
-                } finally {
-                    if (reader != null)
-                        reader.close();
-                }
-
-            }
-        }
-        if (bw != null)
-            bw.close();
-        LOG.info("Copy took " + GenomixJobConf.tock("copyBinToLocal") + "ms");
-    }
-
-    static void drawStatistics(JobConf conf, String inputStats, String outputChart) throws IOException {
-        LOG.info("Getting coverage statistics...");
-        GenomixJobConf.tick("drawStatistics");
-        FileSystem dfs = FileSystem.get(conf);
-
-        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
-        SequenceFile.Reader reader = null;
-        VKmerBytesWritable key = null;
-        NodeWritable value = null;
-        TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
-        FileStatus[] files = dfs.globStatus(new Path(inputStats + File.separator + "*"));
-        for (FileStatus f : files) {
-            if (f.getLen() != 0) {
-                try {
-                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
-                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    while (reader.next(key, value)) {
-                        if (key == null || value == null)
-                            break;
-                        Integer cov = java.lang.Math.round(value.getAverageCoverage());
-                        Long count = coverageCounts.get(cov);
-                        if (count == null)
-                            coverageCounts.put(cov, new Long(1));
-                        else
-                            coverageCounts.put(cov, count + 1);
-                    }
-                } catch (Exception e) {
-                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
-                } finally {
-                    if (reader != null)
-                        reader.close();
-                }
-            }
-        }
-
-        XYSeries series = new XYSeries("Kmer Coverage");
-        for (Entry<Integer, Long> pair : coverageCounts.entrySet()) {
-            series.add(pair.getKey().floatValue(), pair.getValue().longValue());
-        }
-        XYDataset xyDataset = new XYSeriesCollection(series);
-        JFreeChart chart = ChartFactory.createXYLineChart("Coverage per kmer in " + new File(inputStats).getName(),
-                "Coverage", "Count", xyDataset, PlotOrientation.VERTICAL, true, true, false);
-
-        // Write the data to the output stream:
-        FileOutputStream chartOut = new FileOutputStream(new File(outputChart));
-        ChartUtilities.writeChartAsPNG(chartOut, chart, 800, 600);
-        chartOut.flush();
-        chartOut.close();
-        LOG.info("Coverage took " + GenomixJobConf.tock("drawStatistics") + "ms");
-    }
-
-    static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild)
-            throws IOException {
-        LOG.info("Dumping graph to fasta...");
-        GenomixJobConf.tick("dumpGraph");
-        FileSystem dfs = FileSystem.get(conf);
-
-        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
-        SequenceFile.Reader reader = null;
-        VKmerBytesWritable key = null;
-        NodeWritable value = null;
-        BufferedWriter bw = null;
-        FileStatus[] files = dfs.globStatus(new Path(inputGraph + File.separator + "*"));
-        for (FileStatus f : files) {
-            if (f.getLen() != 0) {
-                try {
-                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
-                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    if (bw == null)
-                        bw = new BufferedWriter(new FileWriter(outputFasta));
-                    while (reader.next(key, value)) {
-                        if (key == null || value == null)
-                            break;
-                        bw.write(">node_" + key.toString() + "\n");
-                        bw.write(followingBuild ? key.toString() : value.getInternalKmer().toString());
-                        bw.newLine();
-                    }
-                } catch (Exception e) {
-                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
-                } finally {
-                    if (reader != null)
-                        reader.close();
-                }
-            }
-        }
-        if (bw != null)
-            bw.close();
-        LOG.info("Dump graph to fasta took " + GenomixJobConf.tock("dumpGraph") + "ms");
-    }
-
-}
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
index adfae45..3312771 100644
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
+++ b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
@@ -35,9 +35,10 @@
 
 import edu.uci.ics.genomix.config.GenomixJobConf;
 import edu.uci.ics.genomix.config.GenomixJobConf.Patterns;
-import edu.uci.ics.genomix.driver.DriverUtils.NCTypes;
 import edu.uci.ics.genomix.hyracks.graph.driver.Driver.Plan;
-import edu.uci.ics.genomix.minicluster.GenomixMiniCluster;
+import edu.uci.ics.genomix.minicluster.DriverUtils;
+import edu.uci.ics.genomix.minicluster.GenomixClusterManager;
+import edu.uci.ics.genomix.minicluster.GenomixClusterManager.ClusterType;
 import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
 import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
 import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -64,43 +65,50 @@
     private String prevOutput;
     private String curOutput;
     private int stepNum;
-    private List<PregelixJob> jobs;
+    private List<PregelixJob> pregelixJobs;
     private boolean followingBuild = false; // need to adapt the graph immediately after building
+    private boolean runLocal;
+    private int numCoresPerMachine;
+    private int numMachines;
 
+    private GenomixClusterManager manager;
     private edu.uci.ics.genomix.hyracks.graph.driver.Driver hyracksDriver;
     private edu.uci.ics.pregelix.core.driver.Driver pregelixDriver;
 
-    private void buildGraphWithHyracks(GenomixJobConf conf) throws NumberFormatException, IOException {
-        DriverUtils.shutdownNCs();
-        DriverUtils.shutdownCC();
-        DriverUtils.startCC();
-        DriverUtils.startNCs(NCTypes.HYRACKS);
+    private void buildGraphWithHyracks(GenomixJobConf conf) throws Exception {
         LOG.info("Building Graph using Hyracks...");
+        manager.startCluster(ClusterType.HYRACKS);
         GenomixJobConf.tick("buildGraphWithHyracks");
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        hyracksDriver = new edu.uci.ics.genomix.hyracks.graph.driver.Driver(conf.get(GenomixJobConf.IP_ADDRESS),
-                Integer.parseInt(conf.get(GenomixJobConf.PORT)), Integer.parseInt(conf
-                        .get(GenomixJobConf.CORES_PER_MACHINE)));
+        String hyracksIP = runLocal ? GenomixClusterManager.LOCAL_IP : conf.get(GenomixJobConf.IP_ADDRESS);
+        int hyracksPort = runLocal ? GenomixClusterManager.LOCAL_HYRACKS_CLIENT_PORT : Integer.parseInt(conf.get(GenomixJobConf.PORT));
+        hyracksDriver = new edu.uci.ics.genomix.hyracks.graph.driver.Driver(hyracksIP, hyracksPort, numCoresPerMachine);
         hyracksDriver.runJob(conf, Plan.BUILD_UNMERGED_GRAPH, Boolean.parseBoolean(conf.get(GenomixJobConf.PROFILE)));
         followingBuild = true;
+        manager.stopCluster(ClusterType.HYRACKS);
         LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHyracks") + "ms");
+        if (Boolean.parseBoolean(conf.get(GenomixJobConf.DRAW_STATISTICS)))
+            DriverUtils.drawStatistics(conf, curOutput, new Path(curOutput).getName() + ".coverage.png");
     }
 
-    private void buildGraphWithHadoop(GenomixJobConf conf) throws IOException {
+    private void buildGraphWithHadoop(GenomixJobConf conf) throws Exception {
         LOG.info("Building Graph using Hadoop...");
+        manager.startCluster(ClusterType.HADOOP);
         GenomixJobConf.tick("buildGraphWithHadoop");
         DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF)));
         conf.writeXml(confOutput);
         confOutput.close();
-        // TODO copy the jars up to DFS
         edu.uci.ics.genomix.hadoop.contrailgraphbuilding.GenomixDriver hadoopDriver = new edu.uci.ics.genomix.hadoop.contrailgraphbuilding.GenomixDriver();
-        hadoopDriver.run(prevOutput, curOutput, Integer.parseInt(conf.get(GenomixJobConf.CORES_PER_MACHINE)),
+        hadoopDriver.run(prevOutput, curOutput, numCoresPerMachine * numMachines,
                 Integer.parseInt(conf.get(GenomixJobConf.KMER_LENGTH)), 4 * 100000, true, HADOOP_CONF);
         FileUtils.deleteQuietly(new File(HADOOP_CONF));
         System.out.println("Finished job Hadoop-Build-Graph");
         followingBuild = true;
-        LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHadoop"));
+        manager.stopCluster(ClusterType.HADOOP);
+        LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHadoop") + "ms");
+        if (Boolean.parseBoolean(conf.get(GenomixJobConf.DRAW_STATISTICS)))
+            DriverUtils.drawStatistics(conf, curOutput, new Path(curOutput).getName() + ".coverage.png");
     }
 
     @SuppressWarnings("deprecation")
@@ -114,27 +122,30 @@
     private void addJob(PregelixJob job) {
         if (followingBuild)
             job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
-        jobs.add(job);
+        pregelixJobs.add(job);
         followingBuild = false;
     }
 
     public void runGenomix(GenomixJobConf conf) throws NumberFormatException, HyracksException, Exception {
+        LOG.info("Starting Genomix Assembler Pipeline...");
+        GenomixJobConf.tick("runGenomix");
+
         DriverUtils.updateCCProperties(conf);
+        numCoresPerMachine = conf.get(GenomixJobConf.HYRACKS_IO_DIRS).split(",").length;
+        numMachines = conf.get(GenomixJobConf.HYRACKS_SLAVES).split("\r?\n|\r").length; // split on newlines
         GenomixJobConf.setGlobalStaticConstants(conf);
         followingBuild = Boolean.parseBoolean(conf.get(GenomixJobConf.FOLLOWS_GRAPH_BUILD));
-        jobs = new ArrayList<PregelixJob>();
+        pregelixJobs = new ArrayList<PregelixJob>();
         stepNum = 0;
-        boolean dump = false;
-        final boolean runLocal = Boolean.parseBoolean(conf.get(GenomixJobConf.RUN_LOCAL));
-        if (runLocal)
-            GenomixMiniCluster.init(conf);
-        DriverUtils.addClusterShutdownHook(runLocal); // *always* shut down any CCs or NCs we start
+        runLocal = Boolean.parseBoolean(conf.get(GenomixJobConf.RUN_LOCAL));
+        manager = new GenomixClusterManager(runLocal, conf);
+        manager.stopCluster(ClusterType.HYRACKS); // shut down any existing NCs and CCs
 
         String localInput = conf.get(GenomixJobConf.LOCAL_INPUT_DIR);
         if (localInput != null) {
             conf.set(GenomixJobConf.INITIAL_INPUT_DIR, conf.get(GenomixJobConf.HDFS_WORK_PATH) + File.separator
                     + "00-initial-input-from-genomix-driver");
-            DriverUtils.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
+            GenomixClusterManager.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
         }
         curOutput = conf.get(GenomixJobConf.INITIAL_INPUT_DIR);
 
@@ -193,53 +204,51 @@
                     setOutput(conf, Patterns.SCAFFOLD);
                     addJob(ScaffoldingVertex.getConfiguredJob(conf, ScaffoldingVertex.class));
                     break;
-                case STATS:
-                    DriverUtils.drawStatistics(conf, curOutput, "coverage.png");
-                    break;
                 case DUMP_FASTA:
-                    dump = true;
+                    DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
                     break;
             }
         }
 
-        if (jobs.size() > 0) {
-            DriverUtils.shutdownNCs();
-            DriverUtils.shutdownCC();
-            DriverUtils.startCC();
-            DriverUtils.startNCs(NCTypes.PREGELIX);
+        if (pregelixJobs.size() > 0) {
+            manager.startCluster(ClusterType.PREGELIX);
             pregelixDriver = new edu.uci.ics.pregelix.core.driver.Driver(this.getClass());
-        }
-        // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
-        // this would let them resume at arbitrary points of the pipeline
-        if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
-            for (int i = 0; i < jobs.size(); i++) {
-                LOG.info("Starting job " + jobs.get(i).getJobName());
-                GenomixJobConf.tick("pregelix-job");
-
-                pregelixDriver.runJob(jobs.get(i), conf.get(GenomixJobConf.IP_ADDRESS),
-                        Integer.parseInt(conf.get(GenomixJobConf.PORT)));
-
-                LOG.info("Finished job " + jobs.get(i).getJobName() + " in " + GenomixJobConf.tock("pregelix-job"));
+            String pregelixIP = runLocal ? GenomixClusterManager.LOCAL_IP : conf.get(GenomixJobConf.IP_ADDRESS);
+            int pregelixPort = runLocal ? GenomixClusterManager.LOCAL_PREGELIX_CLIENT_PORT : Integer.parseInt(conf.get(GenomixJobConf.PORT));
+            // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
+            // this would let them resume at arbitrary points of the pipeline
+            if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
+                LOG.info("Starting pregelix job series (saving intermediate results)...");
+                GenomixJobConf.tick("pregelix-runJob-one-by-one");
+                for (int i = 0; i < pregelixJobs.size(); i++) {
+                    LOG.info("Starting job " + pregelixJobs.get(i).getJobName());
+                    GenomixJobConf.tick("pregelix-job");
+                    pregelixDriver.runJob(pregelixJobs.get(i), pregelixIP, pregelixPort);
+                    LOG.info("Finished job " + pregelixJobs.get(i).getJobName() + " in "
+                            + GenomixJobConf.tock("pregelix-job"));
+                }
+                LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJob-one-by-one"));
+            } else {
+                LOG.info("Starting pregelix job series (not saving intermediate results...");
+                GenomixJobConf.tick("pregelix-runJobs");
+                pregelixDriver.runJobs(pregelixJobs, pregelixIP, pregelixPort);
+                LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
             }
-        } else {
-            LOG.info("Starting pregelix job series...");
-            GenomixJobConf.tick("pregelix-runJobs");
-            pregelixDriver.runJobs(jobs, conf.get(GenomixJobConf.IP_ADDRESS),
-                    Integer.parseInt(conf.get(GenomixJobConf.PORT)));
-            LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
+            manager.stopCluster(ClusterType.PREGELIX);
         }
 
         if (conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR) != null)
-            DriverUtils.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
-        if (dump)
-            DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
+            GenomixClusterManager.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
         if (conf.get(GenomixJobConf.FINAL_OUTPUT_DIR) != null)
             FileSystem.get(conf).rename(new Path(curOutput), new Path(GenomixJobConf.FINAL_OUTPUT_DIR));
-
+        
+        LOG.info("Finished the Genomix Assembler Pipeline in " + GenomixJobConf.tock("runGenomix") + "ms!");
     }
 
     public static void main(String[] args) throws CmdLineException, NumberFormatException, HyracksException, Exception {
-        String[] myArgs = { "-kmerLength", "5", "-coresPerMachine", "2",
+        String[] myArgs = {
+                "-runLocal", "true",
+                "-kmerLength", "5",
                 //                        "-saveIntermediateResults", "true",
                 //                        "-localInput", "../genomix-pregelix/data/input/reads/synthetic/",
                 "-localInput", "../genomix-pregelix/data/input/reads/pathmerge",
@@ -251,14 +260,16 @@
                 //                            "-pipelineOrder", "BUILD,MERGE",
                 //                            "-inputDir", "/home/wbiesing/code/hyracks/genomix/genomix-driver/graphbuild.binmerge",
                 //                "-localInput", "../genomix-pregelix/data/TestSet/PathMerge/CyclePath/bin/part-00000", 
-                "-pipelineOrder", "BUILD_HYRACKS,MERGE" };
+                "-pipelineOrder", "MERGE" };
+        // allow Eclipse to run the maven-generated scripts
                 if (System.getProperty("app.home") == null)
-                    System.setProperty("app.home", "/home/wbiesing/code/hyracks/genomix/genomix-driver/target/appassembler");
+                    System.setProperty("app.home", new File("target/appassembler").getAbsolutePath());
 
         //        Patterns.BUILD, Patterns.MERGE, 
         //        Patterns.TIP_REMOVE, Patterns.MERGE,
         //        Patterns.BUBBLE, Patterns.MERGE,
-        GenomixJobConf conf = GenomixJobConf.fromArguments(args);
+//        GenomixJobConf conf = GenomixJobConf.fromArguments(args);
+          GenomixJobConf conf = GenomixJobConf.fromArguments(args);
         GenomixDriver driver = new GenomixDriver();
         driver.runGenomix(conf);
     }
diff --git a/genomix/genomix-driver/src/main/resources/conf/cluster.properties b/genomix/genomix-driver/src/main/resources/conf/cluster.properties
index 66251be..57aa1fa 100644
--- a/genomix/genomix-driver/src/main/resources/conf/cluster.properties
+++ b/genomix/genomix-driver/src/main/resources/conf/cluster.properties
@@ -1,11 +1,14 @@
 #The CC port for Hyracks clients
+# don't change unless you want multiple CCs on one machine
 CC_CLIENTPORT=3099
 
 #The CC port for Hyracks cluster management
+# don't change unless you want multiple CCs on one machine
 CC_CLUSTERPORT=1099
 
-#The directory of hyracks binaries
-HYRACKS_HOME="../../../../hyracks"
+#Sets the http port for the Cluster Controller (default: 16001)
+# don't change unless you want multiple CCs on one machine
+CC_HTTPPORT=16001
 
 WORKPATH=""
 #The tmp directory for cc to install jars
@@ -30,12 +33,18 @@
 CLASSPATH="${HADOOP_HOME}:${CLASSPATH}:."
 
 #The frame size of the internal dataflow engine
-FRAME_SIZE=65536
+FRAME_SIZE=65535
+
+#The frame limit of the internal dataflow engine
+FRAME_LIMIT=4096
+
+#The number of jobs to keep logs for
+JOB_HISTORY_SIZE=50
 
 #CC JAVA_OPTS
-CCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+CCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx5g -Djava.util.logging.config.file=conf/logging.properties"
 # Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
 
 #NC JAVA_OPTS
-NCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx10g -Djava.util.logging.config.file=logging.properties"
+NCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx10g -Djava.util.logging.config.file=conf/logging.properties"
 
diff --git a/genomix/genomix-driver/src/main/resources/conf/logging.properties b/genomix/genomix-driver/src/main/resources/conf/logging.properties
new file mode 100644
index 0000000..a0f9d77
--- /dev/null
+++ b/genomix/genomix-driver/src/main/resources/conf/logging.properties
@@ -0,0 +1,68 @@
+############################################################
+#  	Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.  
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+#  	Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler 
+# classes.  These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+
+#handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers.  For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+
+#.level= SEVERE
+ .level= INFO
+# .level= FINE
+# .level = FINEST
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+
+# java.util.logging.FileHandler.pattern = %h/java%u.log
+# java.util.logging.FileHandler.limit = 50000
+# java.util.logging.FileHandler.count = 1
+# java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+# java.util.logging.FileHandler.formatter = java.util.logging.SimpleFormatter
+
+# Limit the message that are printed on the console to FINE and above.
+
+java.util.logging.ConsoleHandler.level = FINEST
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
+# For example, set the com.xyz.foo logger to only log SEVERE
+# messages:
+
+#edu.uci.ics.genomix.pregelix = INFO
+#edu.uci.ics.asterix.level = FINE
+#edu.uci.ics.algebricks.level = FINE
+#edu.uci.ics.hyracks.level = SEVERE
+#edu.uci.ics.hyracks.control.nc.net.level = FINE
diff --git a/genomix/genomix-driver/src/main/resources/scripts/getip.sh b/genomix/genomix-driver/src/main/resources/scripts/getip.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh b/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh
new file mode 100755
index 0000000..83a2fd5
--- /dev/null
+++ b/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+set -x
+
+GENOMIX_HOME="$( dirname "$( cd "$(dirname "$0")" ; pwd -P )" )"  # script's parent dir's parent
+cd "$GENOMIX_HOME"
+
+. conf/cluster.properties
+. conf/stores.properties
+
+for i in `cat conf/slaves`
+do
+   DIRS=`echo $store | tr "," " "`
+   # ssh to the slave machine and capture its hostname and 
+   ssh -n $i "mkdir -p $DIRS"
+
+   DIRS=`echo $IO_DIRS | tr "," " "`
+   ssh -n $i "mkdir -p $DIRS"
+done
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startAllNCs.sh b/genomix/genomix-driver/src/main/resources/scripts/startAllNCs.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startDebugNc.sh b/genomix/genomix-driver/src/main/resources/scripts/startDebugNc.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startcc.sh b/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
old mode 100644
new mode 100755
index cf7a0b4..e94ad17
--- a/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
@@ -14,7 +14,7 @@
 . conf/cluster.properties
 #Get the IP address of the cc
 CCHOST_NAME=`cat conf/master`
-CCHOST=`bin/getip.sh`
+CCHOST=`ssh -n ${CCHOST_NAME} "${GENOMIX_HOME}/bin/getip.sh"`
 
 #Remove the temp dir
 #rm -rf $CCTMP_DIR
@@ -29,15 +29,30 @@
 export JAVA_OPTS=$CCJAVA_OPTS
 
 cd $CCTMP_DIR
-#Launch hyracks cc script
+#Prepare cc script
+CMD="\"${GENOMIX_HOME}/bin/genomixcc\" -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST"
+
+if [ -n "$CC_CLIENTPORT" ]; then
+  CMD="$CMD -client-net-port $CC_CLIENTPORT"
+fi
+if [ -n "$CC_CLUSTERPORT" ]; then
+  CMD="$CMD -cluster-net-port $CC_CLUSTERPORT"
+fi
+if [ -n "$CC_HTTPPORT" ]; then
+  CMD="$CMD -http-port $CC_HTTPPORT"
+fi
+if [ -n "$JOB_HISTORY_SIZE" ]; then
+  CMD="$CMD -job-history-size $JOB_HISTORY_SIZE"
+fi
 if [ -f "${GENOMIX_HOME}/conf/topology.xml"  ]; then
-#Launch hyracks cc script with topology
-"${GENOMIX_HOME}"/bin/genomixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "${GENOMIX_HOME}/conf/topology.xml" &> "$CCLOGS_DIR"/cc.log &
-else
-#Launch hyracks cc script without toplogy
-"${GENOMIX_HOME}"/bin/genomixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> "$CCLOGS_DIR"/cc.log &
+CMD="$CMD -cluster-topology \"${GENOMIX_HOME}/conf/topology.xml\""
 fi
 
+#Launch cc script
+printf "\n\n\n********************************************\nStarting CC with command %s\n\n" "$CMD" >> "$CCLOGS_DIR"/cc.log
+eval "$CMD >>\"$CCLOGS_DIR/cc.log\" 2>&1 &"
+
+# save the PID of the process we just launched
 PID=$!
 echo "master: "`hostname`$'\t'$PID
-echo $PID > "$GENOMIX_HOME"/conf/cc.pid
+echo $PID > "$GENOMIX_HOME/conf/cc.pid"
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startnc.sh b/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
old mode 100644
new mode 100755
index 448b17d..02d8f97
--- a/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
@@ -57,8 +57,12 @@
   exit 1
 fi
 
-#Launch hyracks nc
-"${GENOMIX_HOME}"/bin/$NCTYPE -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> "$NCLOGS_DIR"/$NODEID.log &
+CMD="\"${GENOMIX_HOME}/bin/$NCTYPE\" -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices \"${IO_DIRS}\""
+
+printf "\n\n\n********************************************\nStarting NC with command %s\n\n" "$CMD" >> "$NCLOGS_DIR"/$NODEID.log
+
+#Launch nc
+eval "$CMD >> \"$NCLOGS_DIR/$NODEID.log\" 2>&1  &"
 
 echo $!  # write PID of bg'ed script
 
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopAllNCs.sh b/genomix/genomix-driver/src/main/resources/scripts/stopAllNCs.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh b/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
old mode 100644
new mode 100755
index 41efa0f..ff9dfc4
--- a/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
@@ -17,7 +17,8 @@
   echo "Stopped CC on master: "`hostname`$'\t'$PID
 fi
 
-#Clean up CC temp dir
-rm -rf $CCTMP_DIR/*
+#Clean up CC temp dir but keep the default logs directory
+shopt -s extglob
+rm -rf $CCTMP_DIR/!(logs)
 
 
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh b/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
old mode 100644
new mode 100755
index 4007089..14df6ba
--- a/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
@@ -17,5 +17,6 @@
 	rm -rf $io_dir/*
 done
 
-#Clean up NC temp dir
-rm -rf $NCTMP_DIR/*
+#Clean up NC temp dir but keep the default logs directory
+shopt -s extglob
+rm -rf $NCTMP_DIR/!(logs)
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
index 960a342..0c7c61b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
@@ -159,7 +159,8 @@
 
         String ipAddress = jobConf.get(GenomixJobConf.IP_ADDRESS);
         int port = Integer.parseInt(jobConf.get(GenomixJobConf.PORT));
-        int numOfDuplicate = jobConf.getInt(GenomixJobConf.CORES_PER_MACHINE, 4);
+        String IODirs = jobConf.get(GenomixJobConf.HYRACKS_IO_DIRS, null);
+        int numOfDuplicate = IODirs != null ? IODirs.split(",").length : 4;
         boolean bProfiling = jobConf.getBoolean(GenomixJobConf.PROFILE, true);
         jobConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
         jobConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
index ffdb92e..56b59c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
@@ -80,6 +80,8 @@
 
     protected ConfFactory hadoopJobConfFactory;
     protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    private static final int DEFAULT_FRAME_LIMIT = 4096;
+    private static final int DEFAULT_FRAME_SIZE = 65535;
     protected String[] ncNodeNames;
     protected String[] readSchedule;
 
@@ -236,10 +238,10 @@
     protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
         Configuration conf = confFactory.getConf();
         kmerSize = Integer.parseInt(conf.get(GenomixJobConf.KMER_LENGTH));
-        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
-        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
-        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
-        System.out.println(GenomixJobConf.DEFAULT_FRAME_SIZE);
+        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, DEFAULT_FRAME_LIMIT);
+//        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, DEFAULT_FRAME_SIZE);
+        System.out.println(DEFAULT_FRAME_SIZE);
         System.out.println(frameSize);
         String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         groupbyType = GroupbyType.PRECLUSTER;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 85ecf71..b2c3767 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -74,6 +74,13 @@
         if (maxIteration < 0)
             maxIteration = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.GRAPH_CLEAN_MAX_ITERATIONS));
         GenomixJobConf.setGlobalStaticConstants(getContext().getConfiguration());
+
+        
+//        if (getSuperstep() == 1) {
+//            kmerSize = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.KMER_LENGTH));
+//            maxIteration = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.GRAPH_CLEAN_MAX_ITERATIONS));
+//            GenomixJobConf.setGlobalStaticConstants(getContext().getConfiguration());
+//        }
     }
     
     /**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 3520914..8678e9e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -47,8 +47,8 @@
         receivedMsgList.clear();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 42dd1e1..482306a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -82,8 +82,8 @@
         }
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index aecd546..3c9ac3d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -69,8 +69,8 @@
         tmpValue.reset();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
index b7005f9..c796432 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
@@ -50,8 +50,8 @@
         }
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
index dfe76a9..9782637 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
@@ -44,8 +44,8 @@
         super.initVertex();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         if(getSuperstep() == 1)
             ScaffoldingAggregator.preScaffoldingMap.clear();
         else if(getSuperstep() == 2)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 90aa571..a075844 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -97,8 +97,8 @@
             tmpOutgoingEdge = new EdgeWritable();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index eebaab1..679a472 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -41,8 +41,8 @@
             destVertexId = new VKmerBytesWritable();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
index 36a2e54..de2be35 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
@@ -43,8 +43,8 @@
             repeatKmer = new VKmerBytesWritable();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }