Merge remote-tracking branch 'origin/genomix/fullstack_genomix' into nanzhang/hyracks_genomix

# By Jake Biesinger
# Via Jake Biesinger
* origin/genomix/fullstack_genomix: (29 commits)
  revert to previous initVertex behavior
  try to have 2 local stes of CCs and NCs (-runLocal not working)
  don't use a max number of readids on edges
  Add full-pipeline counters
  hadoop uses |IO_DIRS| * #slaves reducers
  use $IO_DIRS, not -coresPerMachine for hyracks partitions + hadoop #reducers
  Hadoop graphbuild works!
  hadoop dist cache must be an absolute path
  hadoop jar deployment fix
  shut down previous clusters again
  don't delete the default log directory on cluster shutdown
  read additional ports from .properties; launch multiple CC's on one machine(?)
  tweak cluster shutdown
  read FRAME_SIZE and FRAME_LIMIT from cluster.properties
  Use maven-built app.home when run from Eclipse (mus trun mvn package once)
  fix shutdown bug when -runLocal=false
  move scripts back (can't run non-local eclipse without mvn package)
  move and chmod +x scripts in driver so eclipse can call the directly
  runLocal is working but doesn't shut down properly
  only shut down pregelix cluster if it's running
  ...
diff --git a/genomix/genomix-data/pom.xml b/genomix/genomix-data/pom.xml
index 18500cc..d164b0c 100644
--- a/genomix/genomix-data/pom.xml
+++ b/genomix/genomix-data/pom.xml
@@ -50,6 +50,11 @@
 			<type>maven-plugin</type>
 		</dependency>
 		<dependency>
+			<groupId>jfree</groupId>
+			<artifactId>jfreechart</artifactId>
+			<version>1.0.13</version>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
 			<version>3.1</version>
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
index cfe4dfd..63758f2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
@@ -79,6 +79,11 @@
         @Option(name = "-followsGraphBuild", usage = "whether or not the given input is output from a previous graph-build", required = false)
         private boolean followsGraphBuild = false;
         
+        @Option(name = "-clusterWaitTime", usage = "the amount of time (in ms) to wait between starting/stopping CC/NC", required = false)
+        private int clusterWaitTime = -1;
+        
+        @Option(name = "-drawStatistics", usage = "Plot coverage statistics after graphbuilding stages", required = false)
+        private boolean drawStatistics = false;
 
         // Graph cleaning
         @Option(name = "-bridgeRemove_maxLength", usage = "Nodes with length <= bridgeRemoveLength that bridge separate paths are removed from the graph", required = false)
@@ -115,9 +120,6 @@
         @Option(name = "-profile", usage = "Whether or not to do runtime profifling", required = false)
         private boolean profile = false;
         
-        @Option(name = "-coresPerMachine", usage="the number of cores available in each machine", required=false)
-        private int coresPerMachine = -1;
-        
         @Option(name = "-runLocal", usage = "Run a local instance using the Hadoop MiniCluster. NOTE: overrides settings for -ip and -port and those in conf/*.properties", required=false)
         private boolean runLocal = false;
         
@@ -143,7 +145,6 @@
         TIP_REMOVE,
         SCAFFOLD,
         SPLIT_REPEAT,
-        STATS,
         DUMP_FASTA;
         
         /**
@@ -176,6 +177,8 @@
     public static final String LOCAL_OUTPUT_DIR = "genomix.final.local.output.dir";
     public static final String SAVE_INTERMEDIATE_RESULTS = "genomix.save.intermediate.results";
     public static final String FOLLOWS_GRAPH_BUILD = "genomix.follows.graph.build";
+    public static final String CLUSTER_WAIT_TIME = "genomix.cluster.wait.time";
+    public static final String DRAW_STATISTICS = "genomix.draw.statistics";
     
     // Graph cleaning
     public static final String BRIDGE_REMOVE_MAX_LENGTH = "genomix.bridgeRemove.maxLength";
@@ -194,31 +197,11 @@
     public static final String RUN_LOCAL = "genomix.runLocal";
 
     // TODO should these also be command line options?
-    public static final String CORES_PER_MACHINE = "genomix.driver.duplicate.num";
     public static final String FRAME_SIZE = "genomix.framesize";
     public static final String FRAME_LIMIT = "genomix.framelimit";
-    public static final String TABLE_SIZE = "genomix.tablesize";
     public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
     public static final String OUTPUT_FORMAT = "genomix.graph.output";
 
-    /** Configurations used by hybrid groupby function in graph build phrase */
-    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
-    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
-    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
-    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
-    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
-
-    public static final int DEFAULT_FRAME_SIZE = 65536;
-    public static final int DEFAULT_FRAME_LIMIT = 65536;
-    public static final int DEFAULT_TABLE_SIZE = 10485767;
-    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
-    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
-    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
-    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
-    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
-
-    public static final String GROUPBY_TYPE_HYBRID = "hybrid";
-    public static final String GROUPBY_TYPE_EXTERNAL = "external";
     public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
     
     public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
@@ -227,16 +210,21 @@
     public static final String OUTPUT_FORMAT_BINARY = "genomix.outputformat.binary";
     public static final String OUTPUT_FORMAT_TEXT = "genomix.outputformat.text";
     public static final String HDFS_WORK_PATH = "genomix.hdfs.work.path";
+    public static final String HYRACKS_IO_DIRS = "genomix.hyracks.IO_DIRS";
+    public static final String HYRACKS_SLAVES = "genomix.hyracks.slaves.list";
+    
     private static final Patterns[] DEFAULT_PIPELINE_ORDER = {
                     Patterns.BUILD, Patterns.MERGE, 
                     Patterns.LOW_COVERAGE, Patterns.MERGE,
                     Patterns.TIP_REMOVE, Patterns.MERGE,
-//                    Patterns.BUBBLE, Patterns.MERGE,
-//                    Patterns.SPLIT_REPEAT, Patterns.MERGE,
-//                    Patterns.SCAFFOLD, Patterns.MERGE
+                    Patterns.BUBBLE, Patterns.MERGE,
+                    Patterns.SPLIT_REPEAT, Patterns.MERGE,
+                    Patterns.SCAFFOLD, Patterns.MERGE
             };
     
     
+    
+    
     private String[] extraArguments = {};
     
     private static Map<String, Long> tickTimes = new HashMap<String, Long>(); 
@@ -355,10 +343,13 @@
             set(HDFS_WORK_PATH, "genomix_out");  // should be in the user's home directory? 
         
         // hyracks-specific
-        if (getInt(CORES_PER_MACHINE, -1) == -1)
-            setInt(CORES_PER_MACHINE, 4);
-        if (getInt(FRAME_SIZE, -1) == -1)
-            setInt(FRAME_SIZE, DEFAULT_FRAME_SIZE);
+        if (getInt(CLUSTER_WAIT_TIME, -1) == -1)
+            setInt(CLUSTER_WAIT_TIME, 6000);
+        
+        if (getBoolean(DRAW_STATISTICS, false))
+            setBoolean(DRAW_STATISTICS, true);
+        else
+            setBoolean(DRAW_STATISTICS, false);
         
 //        if (getBoolean(RUN_LOCAL, false)) {
 //            // override any other settings for HOST and PORT
@@ -389,12 +380,9 @@
             set(HDFS_WORK_PATH, opts.hdfsWorkPath);
         setBoolean(SAVE_INTERMEDIATE_RESULTS, opts.saveIntermediateResults);
         setBoolean(FOLLOWS_GRAPH_BUILD, opts.followsGraphBuild);
+        setInt(CLUSTER_WAIT_TIME, opts.clusterWaitTime);
+        setBoolean(DRAW_STATISTICS, opts.drawStatistics);
             
-
-//        if (opts.runLocal && (opts.ipAddress != null || opts.port != -1))
-//            throw new IllegalArgumentException("Option -runLocal cannot be set at the same time as -port or -ip! (-runLocal starts a cluster; -ip and -port specify an existing cluster)");
-        if (opts.runLocal)
-            throw new IllegalArgumentException("runLocal is currently unsupported!");
         setBoolean(RUN_LOCAL, opts.runLocal);
         
         // Hyracks/Pregelix Setup
@@ -402,7 +390,6 @@
             set(IP_ADDRESS, opts.ipAddress);
         setInt(PORT, opts.port);
         setBoolean(PROFILE, opts.profile);
-        setInt(CORES_PER_MACHINE, opts.coresPerMachine);
         
         // Graph cleaning
         setInt(BRIDGE_REMOVE_MAX_LENGTH, opts.bridgeRemove_maxLength);
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java
new file mode 100644
index 0000000..20bfceb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.minicluster;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.jfree.chart.ChartFactory;
+import org.jfree.chart.ChartUtilities;
+import org.jfree.chart.JFreeChart;
+import org.jfree.chart.plot.PlotOrientation;
+import org.jfree.data.xy.XYDataset;
+import org.jfree.data.xy.XYSeries;
+import org.jfree.data.xy.XYSeriesCollection;
+
+import edu.uci.ics.genomix.config.GenomixJobConf;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DriverUtils {
+
+    public static final Log LOG = LogFactory.getLog(DriverUtils.class);
+
+    /*
+     * Get the IP address of the master node using the bin/getip.sh script
+     */
+    public static String getIP(String hostName) throws IOException, InterruptedException {
+        String getIPCmd = "ssh -n " + hostName + " \"" + System.getProperty("app.home", ".") + File.separator + "bin"
+                + File.separator + "getip.sh\"";
+        Process p = Runtime.getRuntime().exec(getIPCmd);
+        p.waitFor(); // wait for ssh 
+        String stdout = IOUtils.toString(p.getInputStream()).trim();
+        if (p.exitValue() != 0)
+            throw new RuntimeException("Failed to get the ip address of the master node! Script returned exit code: "
+                    + p.exitValue() + "\nstdout: " + stdout + "\nstderr: " + IOUtils.toString(p.getErrorStream()));
+        return stdout;
+    }
+
+    /**
+     * set the CC's IP address and port from the cluster.properties and `getip.sh` script
+     */
+    public static void updateCCProperties(GenomixJobConf conf) throws FileNotFoundException, IOException, InterruptedException {
+        Properties CCProperties = new Properties();
+        CCProperties.load(new FileInputStream(System.getProperty("app.home", ".") + File.separator + "conf"
+                + File.separator + "cluster.properties"));
+        if (conf.get(GenomixJobConf.IP_ADDRESS) == null)
+            conf.set(GenomixJobConf.IP_ADDRESS, getIP("localhost"));
+        if (Integer.parseInt(conf.get(GenomixJobConf.PORT)) == -1) {
+            conf.set(GenomixJobConf.PORT, CCProperties.getProperty("CC_CLIENTPORT"));
+        }
+        if (conf.get(GenomixJobConf.FRAME_SIZE) == null)
+            conf.set(GenomixJobConf.FRAME_SIZE, CCProperties.getProperty("FRAME_SIZE"));
+        if (conf.get(GenomixJobConf.FRAME_LIMIT) == null)
+            conf.set(GenomixJobConf.FRAME_LIMIT, CCProperties.getProperty("FRAME_LIMIT"));
+        if (conf.get(GenomixJobConf.HYRACKS_IO_DIRS) == null)
+            conf.set(GenomixJobConf.HYRACKS_IO_DIRS, CCProperties.getProperty("IO_DIRS"));
+        if (conf.get(GenomixJobConf.HYRACKS_SLAVES) == null) {
+            String slaves = FileUtils.readFileToString(new File(System.getProperty("app.home", ".") + File.separator + "conf"
+                    + File.separator + "slaves"));
+            conf.set(GenomixJobConf.HYRACKS_SLAVES, slaves);
+        }
+    }
+    
+    public static void drawStatistics(JobConf conf, String inputStats, String outputChart) throws IOException {
+        LOG.info("Getting coverage statistics...");
+        GenomixJobConf.tick("drawStatistics");
+        FileSystem dfs = FileSystem.get(conf);
+
+        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
+        SequenceFile.Reader reader = null;
+        VKmerBytesWritable key = null;
+        NodeWritable value = null;
+        TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
+        FileStatus[] files = dfs.globStatus(new Path(inputStats + File.separator + "*"));
+        for (FileStatus f : files) {
+            if (f.getLen() != 0) {
+                try {
+                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    while (reader.next(key, value)) {
+                        if (key == null || value == null)
+                            break;
+                        Integer cov = java.lang.Math.round(value.getAverageCoverage());
+                        Long count = coverageCounts.get(cov);
+                        if (count == null)
+                            coverageCounts.put(cov, new Long(1));
+                        else
+                            coverageCounts.put(cov, count + 1);
+                    }
+                } catch (Exception e) {
+                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
+                } finally {
+                    if (reader != null)
+                        reader.close();
+                }
+            }
+        }
+
+        XYSeries series = new XYSeries("Kmer Coverage");
+        for (Entry<Integer, Long> pair : coverageCounts.entrySet()) {
+            series.add(pair.getKey().floatValue(), pair.getValue().longValue());
+        }
+        XYDataset xyDataset = new XYSeriesCollection(series);
+        JFreeChart chart = ChartFactory.createXYLineChart("Coverage per kmer in " + new File(inputStats).getName(),
+                "Coverage", "Count", xyDataset, PlotOrientation.VERTICAL, true, true, false);
+
+        // Write the data to the output stream:
+        FileOutputStream chartOut = new FileOutputStream(new File(outputChart));
+        ChartUtilities.writeChartAsPNG(chartOut, chart, 800, 600);
+        chartOut.flush();
+        chartOut.close();
+        LOG.info("Coverage took " + GenomixJobConf.tock("drawStatistics") + "ms");
+    }
+
+    public static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild)
+            throws IOException {
+        LOG.info("Dumping graph to fasta...");
+        GenomixJobConf.tick("dumpGraph");
+        FileSystem dfs = FileSystem.get(conf);
+
+        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
+        SequenceFile.Reader reader = null;
+        VKmerBytesWritable key = null;
+        NodeWritable value = null;
+        BufferedWriter bw = null;
+        FileStatus[] files = dfs.globStatus(new Path(inputGraph + File.separator + "*"));
+        for (FileStatus f : files) {
+            if (f.getLen() != 0) {
+                try {
+                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    if (bw == null)
+                        bw = new BufferedWriter(new FileWriter(outputFasta));
+                    while (reader.next(key, value)) {
+                        if (key == null || value == null)
+                            break;
+                        bw.write(">node_" + key.toString() + "\n");
+                        bw.write(followingBuild ? key.toString() : value.getInternalKmer().toString());
+                        bw.newLine();
+                    }
+                } catch (Exception e) {
+                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
+                } finally {
+                    if (reader != null)
+                        reader.close();
+                }
+            }
+        }
+        if (bw != null)
+            bw.close();
+        LOG.info("Dump graph to fasta took " + GenomixJobConf.tock("dumpGraph") + "ms");
+    }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java
new file mode 100644
index 0000000..5d41ae2
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java
@@ -0,0 +1,393 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.minicluster;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.config.GenomixJobConf;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
+
+/**
+ * 
+ *
+ */
+public class GenomixClusterManager {
+
+    public enum ClusterType {
+        HYRACKS,
+        PREGELIX,
+        HADOOP
+    }
+
+    private static final Log LOG = LogFactory.getLog(GenomixClusterManager.class);
+    public static final String LOCAL_HOSTNAME = "localhost";
+    public static final String LOCAL_IP = "127.0.0.1";
+    public static final int LOCAL_HYRACKS_CLIENT_PORT = 3099;
+    public static final int LOCAL_HYRACKS_CC_PORT = 1099;
+    public static final int LOCAL_PREGELIX_CLIENT_PORT = 3097;
+    public static final int LOCAL_PREGELIX_CC_PORT = 1097;
+
+    private ClusterControllerService localHyracksCC;
+    private NodeControllerService localHyracksNC;
+    private ClusterControllerService localPregelixCC;
+    private NodeControllerService localPregelixNC;
+    private MiniDFSCluster localDFSCluster;
+    private MiniMRCluster localMRCluster;
+
+    private final boolean runLocal;
+    private final GenomixJobConf conf;
+    private boolean jarsCopiedToHadoop = false;
+
+    private HashMap<ClusterType, Thread> shutdownHooks = new HashMap<ClusterType, Thread>();
+
+    public GenomixClusterManager(boolean runLocal, GenomixJobConf conf) {
+        this.runLocal = runLocal;
+        this.conf = conf;
+    }
+
+    /**
+     * Start a cluster of the given type. If runLocal is specified, we will create an in-memory version of the cluster.
+     */
+    public void startCluster(ClusterType clusterType) throws Exception {
+        addClusterShutdownHook(clusterType);
+        switch (clusterType) {
+            case HYRACKS:
+            case PREGELIX:
+                if (runLocal) {
+                    startLocalCC(clusterType);
+                    startLocalNC(clusterType);
+                } else {
+                    int sleepms = Integer.parseInt(conf.get(GenomixJobConf.CLUSTER_WAIT_TIME));
+                    startCC(sleepms);
+                    startNCs(clusterType, sleepms);
+                }
+                break;
+            case HADOOP:
+                if (runLocal)
+                    startLocalMRCluster();
+                else
+                    deployJarsToHadoop();
+                break;
+        }
+    }
+
+    public void stopCluster(ClusterType clusterType) throws Exception {
+        switch (clusterType) {
+            case HYRACKS:
+                if (runLocal) {
+                    if (localHyracksCC != null) {
+//                        localHyracksCC.stop();
+                        localHyracksCC = null;
+                    }
+                    if (localHyracksNC != null) {
+//                        localHyracksNC.stop();
+                        localHyracksNC = null;
+                    }
+                } else {
+                    shutdownCC();
+                    shutdownNCs();
+                }
+                break;
+            case PREGELIX:
+                if (runLocal) {
+                    if (localPregelixCC != null) {
+//                        localPregelixCC.stop();
+                        localPregelixCC = null;
+                    }
+                    if (localPregelixNC != null) {
+//                        localPregelixNC.stop();
+                        localPregelixNC = null;
+
+                    }
+                } else {
+                    shutdownCC();
+                    shutdownNCs();
+                }
+                break;
+            case HADOOP:
+                if (runLocal) {
+                    if (localMRCluster != null) {
+                        localMRCluster.shutdown();
+                        localMRCluster = null;
+                    }
+                    if (localDFSCluster != null) {
+                        localDFSCluster.shutdown();
+                        localDFSCluster = null;
+                    }
+                }
+                break;
+        }
+        removeClusterShutdownHook(clusterType);
+    }
+
+    private void startLocalCC(ClusterType clusterType) throws Exception {
+        LOG.info("Starting local CC...");
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = LOCAL_HOSTNAME;
+        ccConfig.clusterNetIpAddress = LOCAL_HOSTNAME;
+        ccConfig.defaultMaxJobAttempts = 0;
+        ccConfig.jobHistorySize = 1;
+        ccConfig.profileDumpPeriod = -1;
+
+        if (clusterType == ClusterType.HYRACKS) {
+            ccConfig.clusterNetPort = LOCAL_HYRACKS_CC_PORT;
+            ccConfig.clientNetPort = LOCAL_HYRACKS_CLIENT_PORT;
+            localHyracksCC = new ClusterControllerService(ccConfig);
+            localHyracksCC.start();
+        } else if (clusterType == ClusterType.PREGELIX) {
+            ccConfig.clusterNetPort = LOCAL_PREGELIX_CC_PORT;
+            ccConfig.clientNetPort = LOCAL_PREGELIX_CLIENT_PORT;
+            localPregelixCC = new ClusterControllerService(ccConfig);
+            localPregelixCC.start();
+        } else {
+            throw new IllegalArgumentException("Invalid CC type: " + clusterType);
+        }
+    }
+
+    private void startLocalNC(ClusterType clusterType) throws Exception {
+        LOG.info("Starting local NC...");
+        //        ClusterConfig.setClusterPropertiesPath(System.getProperty("app.home") + "/conf/cluster.properties");
+        //        ClusterConfig.setStorePath(...);
+        NCConfig ncConfig = new NCConfig();
+        ncConfig.ccHost = LOCAL_HOSTNAME;
+        ncConfig.clusterNetIPAddress = LOCAL_HOSTNAME;
+        ncConfig.dataIPAddress = LOCAL_IP;
+        ncConfig.datasetIPAddress = LOCAL_IP;
+        ncConfig.nodeId = "nc-" + clusterType;
+        ncConfig.ioDevices = "tmp" + File.separator + "t3";
+
+        if (clusterType == ClusterType.HYRACKS) {
+            ncConfig.ccPort = LOCAL_HYRACKS_CC_PORT;
+            localHyracksNC = new NodeControllerService(ncConfig);
+            localHyracksNC.start();
+        } else if (clusterType == ClusterType.PREGELIX) {
+            ncConfig.ccPort = LOCAL_PREGELIX_CC_PORT;
+            ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
+            localPregelixNC = new NodeControllerService(ncConfig);
+            localPregelixNC.start();
+        } else {
+            throw new IllegalArgumentException("Invalid NC type: " + clusterType);
+        }
+    }
+
+    private void startLocalMRCluster() throws IOException {
+        LOG.info("Starting local DFS and MR cluster...");
+        localDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+        localMRCluster = new MiniMRCluster(1, localDFSCluster.getFileSystem().getUri().toString(), 1);
+    }
+
+    /**
+     * Walk the current CLASSPATH to get all jar's in use and copy them up to all HDFS nodes
+     * 
+     * @throws IOException
+     */
+    private void deployJarsToHadoop() throws IOException {
+        if (!jarsCopiedToHadoop) {
+            LOG.info("Deploying jars in my classpath to HDFS Distributed Cache...");
+            FileSystem dfs = FileSystem.get(conf);
+            String[] classPath = { System.getenv().get("CLASSPATH"), System.getProperty("java.class.path") };
+            for (String cp : classPath) {
+                if (cp == null)
+                    continue;
+                for (String item : cp.split(":")) {
+                    //                    LOG.info("Checking " + item);
+                    if (item.endsWith(".jar")) {
+                        //                        LOG.info("Deploying " + item);
+                        Path localJar = new Path(item);
+                        Path jarDestDir = new Path(conf.get(GenomixJobConf.HDFS_WORK_PATH) + "/jar-dependencies");
+                        // dist cache requires absolute paths. we have to use the working directory if HDFS_WORK_PATH is relative
+                        if (!jarDestDir.isAbsolute()) {
+                            // working dir is the correct base, but we must use the path version (not a URI). Get URI and strip out leading identifiers
+                            String hostNameRE = "([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*";
+                            String[] workDirs = dfs.getWorkingDirectory().toString()
+                                    .split("(hdfs://" + hostNameRE + ":\\d+|file:)", 2);
+                            if (workDirs.length <= 1) {
+                                LOG.info("Weird.... didn't find a URI header matching hdfs://host:port or file:  Just using the original instead.");
+                                jarDestDir = new Path(dfs.getWorkingDirectory() + File.separator + jarDestDir);
+                            } else {
+                                jarDestDir = new Path(workDirs[1] + File.separator + jarDestDir);
+                            }
+                        }
+                        dfs.mkdirs(jarDestDir);
+                        Path destJar = new Path(jarDestDir + File.separator + localJar.getName());
+                        dfs.copyFromLocalFile(localJar, destJar);
+                        //                        LOG.info("Jar in distributed cache: " + destJar);
+                        DistributedCache.addFileToClassPath(destJar, conf);
+                    }
+                }
+            }
+        }
+    }
+
+    private static void startNCs(ClusterType type, int sleepms) throws IOException, InterruptedException {
+        LOG.info("Starting NC's");
+        String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+                + "startAllNCs.sh " + type;
+        Process p = Runtime.getRuntime().exec(startNCCmd);
+        p.waitFor(); // wait for ssh 
+        Thread.sleep(sleepms); // wait for NC -> CC registration
+        if (p.exitValue() != 0)
+            throw new RuntimeException("Failed to start the" + type + " NC's! Script returned exit code: "
+                    + p.exitValue() + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
+                    + IOUtils.toString(p.getErrorStream()));
+    }
+
+    private static void startCC(int sleepms) throws IOException, InterruptedException {
+        LOG.info("Starting CC");
+        String startCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+                + "startcc.sh";
+        Process p = Runtime.getRuntime().exec(startCCCmd);
+        p.waitFor(); // wait for cmd execution
+        Thread.sleep(sleepms); // wait for CC registration
+        if (p.exitValue() != 0)
+            throw new RuntimeException("Failed to start the genomix CC! Script returned exit code: " + p.exitValue()
+                    + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
+                    + IOUtils.toString(p.getErrorStream()));
+    }
+
+    private static void shutdownCC() throws IOException, InterruptedException {
+        LOG.info("Shutting down any previous CC");
+        String stopCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopcc.sh";
+        Process p = Runtime.getRuntime().exec(stopCCCmd);
+        p.waitFor(); // wait for cmd execution
+    }
+
+    private static void shutdownNCs() throws IOException, InterruptedException {
+        LOG.info("Shutting down any previous NC's");
+        String stopNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+                + "stopAllNCs.sh";
+        Process p = Runtime.getRuntime().exec(stopNCCmd);
+        p.waitFor(); // wait for ssh 
+    }
+
+    private void addClusterShutdownHook(final ClusterType clusterType) {
+        if (shutdownHooks.containsKey(clusterType))
+            throw new IllegalArgumentException("Already specified a hook for shutting down a " + clusterType
+                    + " cluster! (Try removing the existing hook first?)");
+        Thread hook = new Thread() {
+            @Override
+            public void run() {
+                LOG.info("Shutting down the cluster...");
+                try {
+                    stopCluster(clusterType);
+                } catch (Exception e) {
+                    System.err.println("Error while shutting the cluster down:");
+                    e.printStackTrace();
+                }
+            }
+        };
+        shutdownHooks.put(clusterType, hook);
+        Runtime.getRuntime().addShutdownHook(hook);
+    }
+
+    private void removeClusterShutdownHook(final ClusterType clusterType) {
+        if (!shutdownHooks.containsKey(clusterType))
+            //            throw new IllegalArgumentException("There is no shutdown hook for " + clusterType + "!");
+            return; // ignore-- we are cleaning up after a previous run
+        try {
+            Runtime.getRuntime().removeShutdownHook(shutdownHooks.get(clusterType));
+        } catch (IllegalStateException e) {
+            // ignore: we must already be shutting down
+        }
+    }
+
+    public static void copyLocalToHDFS(JobConf conf, String localDir, String destDir) throws IOException {
+        LOG.info("Copying local directory " + localDir + " to HDFS: " + destDir);
+        GenomixJobConf.tick("copyLocalToHDFS");
+        FileSystem dfs = FileSystem.get(conf);
+        Path dest = new Path(destDir);
+        dfs.delete(dest, true);
+        dfs.mkdirs(dest);
+
+        File srcBase = new File(localDir);
+        if (srcBase.isDirectory())
+            for (File f : srcBase.listFiles())
+                dfs.copyFromLocalFile(new Path(f.toString()), dest);
+        else
+            dfs.copyFromLocalFile(new Path(localDir), dest);
+
+        LOG.info("Copy took " + GenomixJobConf.tock("copyLocalToHDFS") + "ms");
+    }
+
+    public static void copyBinToLocal(JobConf conf, String hdfsSrcDir, String localDestDir) throws IOException {
+        LOG.info("Copying HDFS directory " + hdfsSrcDir + " to local: " + localDestDir);
+        GenomixJobConf.tick("copyBinToLocal");
+        FileSystem dfs = FileSystem.get(conf);
+        FileUtils.deleteQuietly(new File(localDestDir));
+
+        // save original binary to output/bin
+        dfs.copyToLocalFile(new Path(hdfsSrcDir), new Path(localDestDir + File.separator + "bin"));
+
+        // convert hdfs sequence files to text as output/text
+        BufferedWriter bw = null;
+        SequenceFile.Reader reader = null;
+        Writable key = null;
+        Writable value = null;
+        FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + File.separator + "*"));
+        for (FileStatus f : files) {
+            if (f.getLen() != 0 && !f.isDir()) {
+                try {
+                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                    key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    if (bw == null)
+                        bw = new BufferedWriter(new FileWriter(localDestDir + File.separator + "data"));
+                    while (reader.next(key, value)) {
+                        if (key == null || value == null)
+                            break;
+                        bw.write(key.toString() + "\t" + value.toString());
+                        bw.newLine();
+                    }
+                } catch (Exception e) {
+                    System.out.println("Encountered an error copying " + f + " to local:\n" + e);
+                } finally {
+                    if (reader != null)
+                        reader.close();
+                }
+
+            }
+        }
+        if (bw != null)
+            bw.close();
+        LOG.info("Copy took " + GenomixJobConf.tock("copyBinToLocal") + "ms");
+    }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java
deleted file mode 100644
index 28d6572..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package edu.uci.ics.genomix.minicluster;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.wicket.util.file.File;
-
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
-import edu.uci.ics.genomix.config.GenomixJobConf;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-
-public class GenomixMiniCluster {
-    
-    public static final String NC1_ID = "nc1";
-    public static final int TEST_HYRACKS_CC_PORT = 1099;
-    public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
-    public static final String CC_HOST = "localhost";
-
-    private static ClusterControllerService cc;
-    private static NodeControllerService nc1;
-    private static MiniDFSCluster dfsCluster;
-    private static Path clusterProperties;
-    private static Path clusterStores;
-    private static Path clusterWorkDir;
-    private static Path clusterStoresDir;
-    
-    public static void init(GenomixJobConf conf) throws Exception {
-        makeLocalClusterConfig();
-        ClusterConfig.setClusterPropertiesPath(clusterProperties.toAbsolutePath().toString());
-        ClusterConfig.setStorePath(clusterStores.toAbsolutePath().toString());
-//        dfsCluster = new MiniDFSCluster(conf, Integer.parseInt(conf.get(GenomixJobConf.CORES_PER_MACHINE)), true, null);
-        
-        // cluster controller
-        CCConfig ccConfig = new CCConfig();
-        ccConfig.clientNetIpAddress = CC_HOST;
-        ccConfig.clusterNetIpAddress = CC_HOST;
-        ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
-        ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
-        ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.jobHistorySize = 1;
-        ccConfig.profileDumpPeriod = -1;
-        cc = new ClusterControllerService(ccConfig);
-        cc.start();
-
-        // one node controller
-        NCConfig ncConfig1 = new NCConfig();
-        ncConfig1.ccHost = "localhost";
-        ncConfig1.clusterNetIPAddress = "localhost";
-        ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
-        ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.datasetIPAddress = "127.0.0.1";
-        ncConfig1.nodeId = NC1_ID;
-        ncConfig1.ioDevices = clusterWorkDir.toString() + File.separator + "tmp" + File.separator + "t3";
-        ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
-        nc1 = new NodeControllerService(ncConfig1);
-        nc1.start();
-    }
-
-    public static void deinit() throws Exception {
-        nc1.stop();
-        cc.stop();
-//        dfsCluster.shutdown();
-        FileUtils.deleteQuietly(new File("build"));
-        FileUtils.deleteQuietly(clusterProperties.toFile());
-        FileUtils.deleteQuietly(clusterStores.toFile());
-        FileUtils.deleteQuietly(clusterWorkDir.toFile());
-        FileUtils.deleteQuietly(clusterStoresDir.toFile());
-    }
-
-    /**
-     * create the necessary .properties and directories for a minicluster instance 
-     */
-    private static void makeLocalClusterConfig() throws IOException {
-        clusterProperties = Files.createTempFile(FileSystems.getDefault().getPath("."), "tmp.cluster", ".properties");
-        clusterWorkDir = Files.createTempDirectory(FileSystems.getDefault().getPath("."), "tmp.clusterWorkDir");
-        PrintWriter writer = new PrintWriter(clusterProperties.toString(), "US-ASCII");
-        writer.println("CC_CLIENTPORT=" + TEST_HYRACKS_CC_CLIENT_PORT);
-        writer.println("CC_CLUSTERPORT=" + TEST_HYRACKS_CC_PORT);
-        writer.println(String.format("WORKPATH=%s", clusterWorkDir.toAbsolutePath().toString()));
-        writer.println("CCTMP_DIR=${WORKPATH}/tmp/t1");
-        writer.println("NCTMP_DIR=${WORKPATH}/tmp/t2");
-        writer.println("CCLOGS_DIR=$CCTMP_DIR/logs");
-        writer.println("NCLOGS_DIR=$NCTMP_DIR/logs");
-        writer.println("IO_DIRS=${WORKPATH}/tmp/t3");
-        writer.println("JAVA_HOME=$JAVA_HOME");
-        writer.println("CLASSPATH=\"${HADOOP_HOME}:${CLASSPATH}:.\"");
-        writer.println("FRAME_SIZE=65536");
-        writer.println("CCJAVA_OPTS=\"-Xmx1g -Djava.util.logging.config.file=logging.properties\"");
-        writer.println("NCJAVA_OPTS=\"-Xmx1g -Djava.util.logging.config.file=logging.properties\"");
-        writer.close();
-
-        clusterStoresDir = Files.createTempDirectory(FileSystems.getDefault().getPath("."), "tmp.clusterStores");
-        clusterStores = Files.createTempFile(FileSystems.getDefault().getPath("."), "tmp.stores", ".properties");
-        writer = new PrintWriter(clusterStores.toString(), "US-ASCII");
-        writer.println(String.format("store=%s", clusterStoresDir.toString()));
-        writer.close();
-    }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
index 758051d..968c492 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
@@ -331,7 +331,8 @@
         for (EdgeWritable e : edges) {
             VKmerBytesWritable key = e.getKey();
             if (unionEdges.containsKey(key)) {
-                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+//                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+                unionEdges.get(key).unionUpdate(e.getReadIDs());
             }
             else {
                 unionEdges.put(key, new PositionListWritable(e.getReadIDs())); // make a new copy of their list
@@ -340,7 +341,8 @@
         for (EdgeWritable e : other.edges) {
             VKmerBytesWritable key = e.getKey();
             if (unionEdges.containsKey(key)) {
-                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+//                unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+                unionEdges.get(key).unionUpdate(e.getReadIDs());
             }
             else {
                 unionEdges.put(key, new PositionListWritable(e.getReadIDs())); // make a new copy of their list
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
deleted file mode 100644
index f3a4d6a..0000000
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
+++ /dev/null
@@ -1,316 +0,0 @@
-package edu.uci.ics.genomix.driver;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.jfree.chart.ChartFactory;
-import org.jfree.chart.ChartUtilities;
-import org.jfree.chart.JFreeChart;
-import org.jfree.chart.plot.PlotOrientation;
-import org.jfree.data.xy.XYDataset;
-import org.jfree.data.xy.XYSeries;
-import org.jfree.data.xy.XYSeriesCollection;
-
-import edu.uci.ics.genomix.config.GenomixJobConf;
-import edu.uci.ics.genomix.minicluster.GenomixMiniCluster;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class DriverUtils {
-
-    enum NCTypes {
-        HYRACKS,
-        PREGELIX
-    }
-
-    private static final Log LOG = LogFactory.getLog(DriverUtils.class);
-
-    /*
-     * Get the IP address of the master node using the bin/getip.sh script
-     */
-    static String getIP(String hostName) throws IOException, InterruptedException {
-        String getIPCmd = "ssh -n " + hostName + " \"" + System.getProperty("app.home", ".") + File.separator + "bin"
-                + File.separator + "getip.sh\"";
-        Process p = Runtime.getRuntime().exec(getIPCmd);
-        p.waitFor(); // wait for ssh 
-        String stdout = IOUtils.toString(p.getInputStream()).trim();
-        if (p.exitValue() != 0)
-            throw new RuntimeException("Failed to get the ip address of the master node! Script returned exit code: "
-                    + p.exitValue() + "\nstdout: " + stdout + "\nstderr: " + IOUtils.toString(p.getErrorStream()));
-        return stdout;
-    }
-
-    /**
-     * set the CC's IP address and port from the cluster.properties and `getip.sh` script
-     */
-    static void updateCCProperties(GenomixJobConf conf) throws FileNotFoundException, IOException, InterruptedException {
-        Properties CCProperties = new Properties();
-        CCProperties.load(new FileInputStream(System.getProperty("app.home", ".") + File.separator + "conf"
-                + File.separator + "cluster.properties"));
-        if (conf.get(GenomixJobConf.IP_ADDRESS) == null)
-            conf.set(GenomixJobConf.IP_ADDRESS, getIP("localhost"));
-        if (Integer.parseInt(conf.get(GenomixJobConf.PORT)) == -1) {
-            conf.set(GenomixJobConf.PORT, CCProperties.getProperty("CC_CLIENTPORT"));
-        }
-        if (conf.get(GenomixJobConf.FRAME_SIZE) == null)
-            conf.set(GenomixJobConf.FRAME_SIZE,
-                    CCProperties.getProperty("FRAME_SIZE", String.valueOf(GenomixJobConf.DEFAULT_FRAME_SIZE)));
-    }
-
-    static void startNCs(NCTypes type) throws IOException {
-        LOG.info("Starting NC's");
-        String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
-                + "startAllNCs.sh " + type;
-        Process p = Runtime.getRuntime().exec(startNCCmd);
-        try {
-            p.waitFor(); // wait for ssh 
-            Thread.sleep(5000); // wait for NC -> CC registration
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        if (p.exitValue() != 0)
-            throw new RuntimeException("Failed to start the" + type + " NC's! Script returned exit code: "
-                    + p.exitValue() + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
-                    + IOUtils.toString(p.getErrorStream()));
-    }
-
-    static void shutdownNCs() throws IOException {
-        LOG.info("Shutting down any previous NC's");
-        String stopNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
-                + "stopAllNCs.sh";
-        Process p = Runtime.getRuntime().exec(stopNCCmd);
-        try {
-            p.waitFor(); // wait for ssh 
-            //            Thread.sleep(3000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    static void startCC() throws IOException {
-        LOG.info("Starting CC");
-        String startCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
-                + "startcc.sh";
-        Process p = Runtime.getRuntime().exec(startCCCmd);
-        try {
-            p.waitFor(); // wait for cmd execution
-            Thread.sleep(6000); // wait for CC registration
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        if (p.exitValue() != 0)
-            throw new RuntimeException("Failed to start the genomix CC! Script returned exit code: " + p.exitValue()
-                    + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
-                    + IOUtils.toString(p.getErrorStream()));
-    }
-
-    static void shutdownCC() throws IOException {
-        LOG.info("Shutting down CC");
-        String stopCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopcc.sh";
-        Process p = Runtime.getRuntime().exec(stopCCCmd);
-        try {
-            p.waitFor(); // wait for cmd execution
-            //            Thread.sleep(2000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        //        if (p.exitValue() != 0)
-        //            throw new RuntimeException("Failed to stop the genomix CC! Script returned exit code: " + p.exitValue()
-        //                    + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
-        //                    + IOUtils.toString(p.getErrorStream()));
-    }
-
-    static void addClusterShutdownHook(final boolean runLocal) {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                LOG.info("Shutting down the cluster");
-                try {
-                    if (runLocal)
-                        GenomixMiniCluster.deinit();
-                    else {
-                        DriverUtils.shutdownNCs();
-                        DriverUtils.shutdownCC();
-                    }
-                } catch (Exception e) {
-                    System.err.println("Error while shutting the cluster down:");
-                    e.printStackTrace();
-                }
-            }
-        });
-    }
-
-    public static void copyLocalToHDFS(JobConf conf, String localDir, String destDir) throws IOException {
-        LOG.info("Copying local directory " + localDir + " to HDFS: " + destDir);
-        GenomixJobConf.tick("copyLocalToHDFS");
-        FileSystem dfs = FileSystem.get(conf);
-        Path dest = new Path(destDir);
-        dfs.delete(dest, true);
-        dfs.mkdirs(dest);
-
-        File srcBase = new File(localDir);
-        if (srcBase.isDirectory())
-            for (File f : srcBase.listFiles())
-                dfs.copyFromLocalFile(new Path(f.toString()), dest);
-        else
-            dfs.copyFromLocalFile(new Path(localDir), dest);
-
-        LOG.info("Copy took " + GenomixJobConf.tock("copyLocalToHDFS") + "ms");
-    }
-
-    public static void copyBinToLocal(JobConf conf, String hdfsSrcDir, String localDestDir) throws IOException {
-        LOG.info("Copying HDFS directory " + hdfsSrcDir + " to local: " + localDestDir);
-        GenomixJobConf.tick("copyBinToLocal");
-        FileSystem dfs = FileSystem.get(conf);
-        FileUtils.deleteQuietly(new File(localDestDir));
-
-        // save original binary to output/bin
-        dfs.copyToLocalFile(new Path(hdfsSrcDir), new Path(localDestDir + File.separator + "bin"));
-
-        // convert hdfs sequence files to text as output/text
-        BufferedWriter bw = null;
-        SequenceFile.Reader reader = null;
-        Writable key = null;
-        Writable value = null;
-        FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + File.separator + "*"));
-        for (FileStatus f : files) {
-            if (f.getLen() != 0 && !f.isDir()) {
-                try {
-                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
-                    key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    if (bw == null)
-                        bw = new BufferedWriter(new FileWriter(localDestDir + File.separator + "data"));
-                    while (reader.next(key, value)) {
-                        if (key == null || value == null)
-                            break;
-                        bw.write(key.toString() + "\t" + value.toString());
-                        bw.newLine();
-                    }
-                } catch (Exception e) {
-                    System.out.println("Encountered an error copying " + f + " to local:\n" + e);
-                } finally {
-                    if (reader != null)
-                        reader.close();
-                }
-
-            }
-        }
-        if (bw != null)
-            bw.close();
-        LOG.info("Copy took " + GenomixJobConf.tock("copyBinToLocal") + "ms");
-    }
-
-    static void drawStatistics(JobConf conf, String inputStats, String outputChart) throws IOException {
-        LOG.info("Getting coverage statistics...");
-        GenomixJobConf.tick("drawStatistics");
-        FileSystem dfs = FileSystem.get(conf);
-
-        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
-        SequenceFile.Reader reader = null;
-        VKmerBytesWritable key = null;
-        NodeWritable value = null;
-        TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
-        FileStatus[] files = dfs.globStatus(new Path(inputStats + File.separator + "*"));
-        for (FileStatus f : files) {
-            if (f.getLen() != 0) {
-                try {
-                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
-                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    while (reader.next(key, value)) {
-                        if (key == null || value == null)
-                            break;
-                        Integer cov = java.lang.Math.round(value.getAverageCoverage());
-                        Long count = coverageCounts.get(cov);
-                        if (count == null)
-                            coverageCounts.put(cov, new Long(1));
-                        else
-                            coverageCounts.put(cov, count + 1);
-                    }
-                } catch (Exception e) {
-                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
-                } finally {
-                    if (reader != null)
-                        reader.close();
-                }
-            }
-        }
-
-        XYSeries series = new XYSeries("Kmer Coverage");
-        for (Entry<Integer, Long> pair : coverageCounts.entrySet()) {
-            series.add(pair.getKey().floatValue(), pair.getValue().longValue());
-        }
-        XYDataset xyDataset = new XYSeriesCollection(series);
-        JFreeChart chart = ChartFactory.createXYLineChart("Coverage per kmer in " + new File(inputStats).getName(),
-                "Coverage", "Count", xyDataset, PlotOrientation.VERTICAL, true, true, false);
-
-        // Write the data to the output stream:
-        FileOutputStream chartOut = new FileOutputStream(new File(outputChart));
-        ChartUtilities.writeChartAsPNG(chartOut, chart, 800, 600);
-        chartOut.flush();
-        chartOut.close();
-        LOG.info("Coverage took " + GenomixJobConf.tock("drawStatistics") + "ms");
-    }
-
-    static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild)
-            throws IOException {
-        LOG.info("Dumping graph to fasta...");
-        GenomixJobConf.tick("dumpGraph");
-        FileSystem dfs = FileSystem.get(conf);
-
-        // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
-        SequenceFile.Reader reader = null;
-        VKmerBytesWritable key = null;
-        NodeWritable value = null;
-        BufferedWriter bw = null;
-        FileStatus[] files = dfs.globStatus(new Path(inputGraph + File.separator + "*"));
-        for (FileStatus f : files) {
-            if (f.getLen() != 0) {
-                try {
-                    reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
-                    key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    if (bw == null)
-                        bw = new BufferedWriter(new FileWriter(outputFasta));
-                    while (reader.next(key, value)) {
-                        if (key == null || value == null)
-                            break;
-                        bw.write(">node_" + key.toString() + "\n");
-                        bw.write(followingBuild ? key.toString() : value.getInternalKmer().toString());
-                        bw.newLine();
-                    }
-                } catch (Exception e) {
-                    System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
-                } finally {
-                    if (reader != null)
-                        reader.close();
-                }
-            }
-        }
-        if (bw != null)
-            bw.close();
-        LOG.info("Dump graph to fasta took " + GenomixJobConf.tock("dumpGraph") + "ms");
-    }
-
-}
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
index adfae45..8da514c 100644
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
+++ b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
@@ -35,9 +35,10 @@
 
 import edu.uci.ics.genomix.config.GenomixJobConf;
 import edu.uci.ics.genomix.config.GenomixJobConf.Patterns;
-import edu.uci.ics.genomix.driver.DriverUtils.NCTypes;
 import edu.uci.ics.genomix.hyracks.graph.driver.Driver.Plan;
-import edu.uci.ics.genomix.minicluster.GenomixMiniCluster;
+import edu.uci.ics.genomix.minicluster.DriverUtils;
+import edu.uci.ics.genomix.minicluster.GenomixClusterManager;
+import edu.uci.ics.genomix.minicluster.GenomixClusterManager.ClusterType;
 import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
 import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
 import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -64,43 +65,51 @@
     private String prevOutput;
     private String curOutput;
     private int stepNum;
-    private List<PregelixJob> jobs;
+    private List<PregelixJob> pregelixJobs;
     private boolean followingBuild = false; // need to adapt the graph immediately after building
+    private boolean runLocal;
+    private int numCoresPerMachine;
+    private int numMachines;
 
+    private GenomixClusterManager manager;
     private edu.uci.ics.genomix.hyracks.graph.driver.Driver hyracksDriver;
     private edu.uci.ics.pregelix.core.driver.Driver pregelixDriver;
 
-    private void buildGraphWithHyracks(GenomixJobConf conf) throws NumberFormatException, IOException {
-        DriverUtils.shutdownNCs();
-        DriverUtils.shutdownCC();
-        DriverUtils.startCC();
-        DriverUtils.startNCs(NCTypes.HYRACKS);
+    private void buildGraphWithHyracks(GenomixJobConf conf) throws Exception {
         LOG.info("Building Graph using Hyracks...");
+        manager.startCluster(ClusterType.HYRACKS);
         GenomixJobConf.tick("buildGraphWithHyracks");
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        hyracksDriver = new edu.uci.ics.genomix.hyracks.graph.driver.Driver(conf.get(GenomixJobConf.IP_ADDRESS),
-                Integer.parseInt(conf.get(GenomixJobConf.PORT)), Integer.parseInt(conf
-                        .get(GenomixJobConf.CORES_PER_MACHINE)));
+        hyracksDriver = new edu.uci.ics.genomix.hyracks.graph.driver.Driver(
+                runLocal ? GenomixClusterManager.LOCAL_IP : conf.get(GenomixJobConf.IP_ADDRESS),
+                runLocal ? GenomixClusterManager.LOCAL_HYRACKS_CLIENT_PORT : Integer.parseInt(conf.get(GenomixJobConf.PORT)), 
+                        numCoresPerMachine);
         hyracksDriver.runJob(conf, Plan.BUILD_UNMERGED_GRAPH, Boolean.parseBoolean(conf.get(GenomixJobConf.PROFILE)));
         followingBuild = true;
+        manager.stopCluster(ClusterType.HYRACKS);
         LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHyracks") + "ms");
+        if (Boolean.parseBoolean(conf.get(GenomixJobConf.DRAW_STATISTICS)))
+            DriverUtils.drawStatistics(conf, curOutput, new Path(curOutput).getName() + ".coverage.png");
     }
 
-    private void buildGraphWithHadoop(GenomixJobConf conf) throws IOException {
+    private void buildGraphWithHadoop(GenomixJobConf conf) throws Exception {
         LOG.info("Building Graph using Hadoop...");
+        manager.startCluster(ClusterType.HADOOP);
         GenomixJobConf.tick("buildGraphWithHadoop");
         DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF)));
         conf.writeXml(confOutput);
         confOutput.close();
-        // TODO copy the jars up to DFS
         edu.uci.ics.genomix.hadoop.contrailgraphbuilding.GenomixDriver hadoopDriver = new edu.uci.ics.genomix.hadoop.contrailgraphbuilding.GenomixDriver();
-        hadoopDriver.run(prevOutput, curOutput, Integer.parseInt(conf.get(GenomixJobConf.CORES_PER_MACHINE)),
+        hadoopDriver.run(prevOutput, curOutput, numCoresPerMachine * numMachines,
                 Integer.parseInt(conf.get(GenomixJobConf.KMER_LENGTH)), 4 * 100000, true, HADOOP_CONF);
         FileUtils.deleteQuietly(new File(HADOOP_CONF));
         System.out.println("Finished job Hadoop-Build-Graph");
         followingBuild = true;
-        LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHadoop"));
+        manager.stopCluster(ClusterType.HADOOP);
+        LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHadoop") + "ms");
+        if (Boolean.parseBoolean(conf.get(GenomixJobConf.DRAW_STATISTICS)))
+            DriverUtils.drawStatistics(conf, curOutput, new Path(curOutput).getName() + ".coverage.png");
     }
 
     @SuppressWarnings("deprecation")
@@ -114,27 +123,30 @@
     private void addJob(PregelixJob job) {
         if (followingBuild)
             job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
-        jobs.add(job);
+        pregelixJobs.add(job);
         followingBuild = false;
     }
 
     public void runGenomix(GenomixJobConf conf) throws NumberFormatException, HyracksException, Exception {
+        LOG.info("Starting Genomix Assembler Pipeline...");
+        GenomixJobConf.tick("runGenomix");
+        
         DriverUtils.updateCCProperties(conf);
+        numCoresPerMachine = conf.get(GenomixJobConf.HYRACKS_IO_DIRS).split(",").length;
+        numMachines = conf.get(GenomixJobConf.HYRACKS_SLAVES).split("\r?\n|\r").length;  // split on newlines
         GenomixJobConf.setGlobalStaticConstants(conf);
         followingBuild = Boolean.parseBoolean(conf.get(GenomixJobConf.FOLLOWS_GRAPH_BUILD));
-        jobs = new ArrayList<PregelixJob>();
+        pregelixJobs = new ArrayList<PregelixJob>();
         stepNum = 0;
-        boolean dump = false;
-        final boolean runLocal = Boolean.parseBoolean(conf.get(GenomixJobConf.RUN_LOCAL));
-        if (runLocal)
-            GenomixMiniCluster.init(conf);
-        DriverUtils.addClusterShutdownHook(runLocal); // *always* shut down any CCs or NCs we start
+        runLocal = Boolean.parseBoolean(conf.get(GenomixJobConf.RUN_LOCAL));
+        manager = new GenomixClusterManager(runLocal, conf);
+        manager.stopCluster(ClusterType.HYRACKS); // shut down any existing NCs and CCs
 
         String localInput = conf.get(GenomixJobConf.LOCAL_INPUT_DIR);
         if (localInput != null) {
             conf.set(GenomixJobConf.INITIAL_INPUT_DIR, conf.get(GenomixJobConf.HDFS_WORK_PATH) + File.separator
                     + "00-initial-input-from-genomix-driver");
-            DriverUtils.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
+            GenomixClusterManager.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
         }
         curOutput = conf.get(GenomixJobConf.INITIAL_INPUT_DIR);
 
@@ -193,53 +205,49 @@
                     setOutput(conf, Patterns.SCAFFOLD);
                     addJob(ScaffoldingVertex.getConfiguredJob(conf, ScaffoldingVertex.class));
                     break;
-                case STATS:
-                    DriverUtils.drawStatistics(conf, curOutput, "coverage.png");
-                    break;
                 case DUMP_FASTA:
-                    dump = true;
+                    DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
                     break;
             }
         }
 
-        if (jobs.size() > 0) {
-            DriverUtils.shutdownNCs();
-            DriverUtils.shutdownCC();
-            DriverUtils.startCC();
-            DriverUtils.startNCs(NCTypes.PREGELIX);
+        if (pregelixJobs.size() > 0) {
+            manager.startCluster(ClusterType.PREGELIX);
             pregelixDriver = new edu.uci.ics.pregelix.core.driver.Driver(this.getClass());
-        }
-        // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
-        // this would let them resume at arbitrary points of the pipeline
-        if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
-            for (int i = 0; i < jobs.size(); i++) {
-                LOG.info("Starting job " + jobs.get(i).getJobName());
-                GenomixJobConf.tick("pregelix-job");
-
-                pregelixDriver.runJob(jobs.get(i), conf.get(GenomixJobConf.IP_ADDRESS),
+            // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
+            // this would let them resume at arbitrary points of the pipeline
+            if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
+                for (int i = 0; i < pregelixJobs.size(); i++) {
+                    LOG.info("Starting job " + pregelixJobs.get(i).getJobName());
+                    GenomixJobConf.tick("pregelix-job");
+                    
+                    pregelixDriver.runJob(pregelixJobs.get(i), conf.get(GenomixJobConf.IP_ADDRESS),
+                            Integer.parseInt(conf.get(GenomixJobConf.PORT)));
+                    
+                    LOG.info("Finished job " + pregelixJobs.get(i).getJobName() + " in " + GenomixJobConf.tock("pregelix-job"));
+                }
+            } else {
+                LOG.info("Starting pregelix job series...");
+                GenomixJobConf.tick("pregelix-runJobs");
+                pregelixDriver.runJobs(pregelixJobs, conf.get(GenomixJobConf.IP_ADDRESS),
                         Integer.parseInt(conf.get(GenomixJobConf.PORT)));
-
-                LOG.info("Finished job " + jobs.get(i).getJobName() + " in " + GenomixJobConf.tock("pregelix-job"));
+                LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
             }
-        } else {
-            LOG.info("Starting pregelix job series...");
-            GenomixJobConf.tick("pregelix-runJobs");
-            pregelixDriver.runJobs(jobs, conf.get(GenomixJobConf.IP_ADDRESS),
-                    Integer.parseInt(conf.get(GenomixJobConf.PORT)));
-            LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
+            manager.stopCluster(ClusterType.PREGELIX);
         }
 
         if (conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR) != null)
-            DriverUtils.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
-        if (dump)
-            DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
+            GenomixClusterManager.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
         if (conf.get(GenomixJobConf.FINAL_OUTPUT_DIR) != null)
             FileSystem.get(conf).rename(new Path(curOutput), new Path(GenomixJobConf.FINAL_OUTPUT_DIR));
-
+        
+        LOG.info("Finished the Genomix Assembler Pipeline in " + GenomixJobConf.tock("runGenomix") + "ms!");
     }
 
     public static void main(String[] args) throws CmdLineException, NumberFormatException, HyracksException, Exception {
-        String[] myArgs = { "-kmerLength", "5", "-coresPerMachine", "2",
+        String[] myArgs = {
+                "-runLocal", "true",
+                "-kmerLength", "5",
                 //                        "-saveIntermediateResults", "true",
                 //                        "-localInput", "../genomix-pregelix/data/input/reads/synthetic/",
                 "-localInput", "../genomix-pregelix/data/input/reads/pathmerge",
@@ -251,14 +259,16 @@
                 //                            "-pipelineOrder", "BUILD,MERGE",
                 //                            "-inputDir", "/home/wbiesing/code/hyracks/genomix/genomix-driver/graphbuild.binmerge",
                 //                "-localInput", "../genomix-pregelix/data/TestSet/PathMerge/CyclePath/bin/part-00000", 
-                "-pipelineOrder", "BUILD_HYRACKS,MERGE" };
+                "-pipelineOrder", "MERGE" };
+        // allow Eclipse to run the maven-generated scripts
                 if (System.getProperty("app.home") == null)
-                    System.setProperty("app.home", "/home/wbiesing/code/hyracks/genomix/genomix-driver/target/appassembler");
+                    System.setProperty("app.home", new File("target/appassembler").getAbsolutePath());
 
         //        Patterns.BUILD, Patterns.MERGE, 
         //        Patterns.TIP_REMOVE, Patterns.MERGE,
         //        Patterns.BUBBLE, Patterns.MERGE,
-        GenomixJobConf conf = GenomixJobConf.fromArguments(args);
+//        GenomixJobConf conf = GenomixJobConf.fromArguments(args);
+          GenomixJobConf conf = GenomixJobConf.fromArguments(myArgs);
         GenomixDriver driver = new GenomixDriver();
         driver.runGenomix(conf);
     }
diff --git a/genomix/genomix-driver/src/main/resources/conf/cluster.properties b/genomix/genomix-driver/src/main/resources/conf/cluster.properties
index 66251be..a083848 100644
--- a/genomix/genomix-driver/src/main/resources/conf/cluster.properties
+++ b/genomix/genomix-driver/src/main/resources/conf/cluster.properties
@@ -1,11 +1,14 @@
 #The CC port for Hyracks clients
+# don't change unless you want multiple CCs on one machine
 CC_CLIENTPORT=3099
 
 #The CC port for Hyracks cluster management
+# don't change unless you want multiple CCs on one machine
 CC_CLUSTERPORT=1099
 
-#The directory of hyracks binaries
-HYRACKS_HOME="../../../../hyracks"
+#Sets the http port for the Cluster Controller (default: 16001)
+# don't change unless you want multiple CCs on one machine
+CC_HTTPPORT=16001
 
 WORKPATH=""
 #The tmp directory for cc to install jars
@@ -30,10 +33,16 @@
 CLASSPATH="${HADOOP_HOME}:${CLASSPATH}:."
 
 #The frame size of the internal dataflow engine
-FRAME_SIZE=65536
+FRAME_SIZE=65535
+
+#The frame limit of the internal dataflow engine
+FRAME_LIMIT=4096
+
+#The number of jobs to keep logs for
+JOB_HISTORY_SIZE=50
 
 #CC JAVA_OPTS
-CCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+CCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx5g -Djava.util.logging.config.file=logging.properties"
 # Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
 
 #NC JAVA_OPTS
diff --git a/genomix/genomix-driver/src/main/resources/scripts/getip.sh b/genomix/genomix-driver/src/main/resources/scripts/getip.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh b/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh
new file mode 100755
index 0000000..83a2fd5
--- /dev/null
+++ b/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+set -x
+
+GENOMIX_HOME="$( dirname "$( cd "$(dirname "$0")" ; pwd -P )" )"  # script's parent dir's parent
+cd "$GENOMIX_HOME"
+
+. conf/cluster.properties
+. conf/stores.properties
+
+for i in `cat conf/slaves`
+do
+   DIRS=`echo $store | tr "," " "`
+   # ssh to the slave machine and capture its hostname and 
+   ssh -n $i "mkdir -p $DIRS"
+
+   DIRS=`echo $IO_DIRS | tr "," " "`
+   ssh -n $i "mkdir -p $DIRS"
+done
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startAllNCs.sh b/genomix/genomix-driver/src/main/resources/scripts/startAllNCs.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startDebugNc.sh b/genomix/genomix-driver/src/main/resources/scripts/startDebugNc.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startcc.sh b/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
old mode 100644
new mode 100755
index cf7a0b4..8f160b9
--- a/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
@@ -14,7 +14,7 @@
 . conf/cluster.properties
 #Get the IP address of the cc
 CCHOST_NAME=`cat conf/master`
-CCHOST=`bin/getip.sh`
+CCHOST=`ssh -n ${CCHOST_NAME} "${GENOMIX_HOME}/bin/getip.sh"`
 
 #Remove the temp dir
 #rm -rf $CCTMP_DIR
@@ -29,15 +29,30 @@
 export JAVA_OPTS=$CCJAVA_OPTS
 
 cd $CCTMP_DIR
-#Launch hyracks cc script
+#Prepare cc script
+CMD="\"${GENOMIX_HOME}/bin/genomixcc\" -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST"
+
+if [ -n "$CC_CLIENTPORT" ]; then
+  CMD="$CMD -client-net-port $CC_CLIENTPORT"
+fi
+if [ -n "$CC_CLUSTERPORT" ]; then
+  CMD="$CMD -cluster-net-port $CC_CLUSTERPORT"
+fi
+if [ -n "$CC_HTTPPORT" ]; then
+  CMD="$CMD -http-port $CC_HTTPPORT"
+fi
+if [ -n "$JOB_HISTORY_SIZE" ]; then
+  CMD="$CMD -job-history-size $JOB_HISTORY_SIZE"
+fi
 if [ -f "${GENOMIX_HOME}/conf/topology.xml"  ]; then
-#Launch hyracks cc script with topology
-"${GENOMIX_HOME}"/bin/genomixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "${GENOMIX_HOME}/conf/topology.xml" &> "$CCLOGS_DIR"/cc.log &
-else
-#Launch hyracks cc script without toplogy
-"${GENOMIX_HOME}"/bin/genomixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> "$CCLOGS_DIR"/cc.log &
+CMD="$CMD -cluster-topology \"${GENOMIX_HOME}/conf/topology.xml\""
 fi
 
+#Launch cc script
+printf "\n\n\n********************************************\nStarting CC with command %s\n\n" "$CMD" >> "$CCLOGS_DIR"/cc.log
+eval "$CMD &>> \"$CCLOGS_DIR\"/cc.log &"
+
+# save the PID of the process we just launched
 PID=$!
 echo "master: "`hostname`$'\t'$PID
-echo $PID > "$GENOMIX_HOME"/conf/cc.pid
+echo $PID > "$GENOMIX_HOME/conf/cc.pid"
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startnc.sh b/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
old mode 100644
new mode 100755
index 448b17d..ea926e5
--- a/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
@@ -57,8 +57,12 @@
   exit 1
 fi
 
-#Launch hyracks nc
-"${GENOMIX_HOME}"/bin/$NCTYPE -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> "$NCLOGS_DIR"/$NODEID.log &
+CMD="\"${GENOMIX_HOME}/bin/$NCTYPE\" -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices \"${IO_DIRS}\""
+
+printf "\n\n\n********************************************\nStarting NC with command %s\n\n" "$CMD" >> "$NCLOGS_DIR"/$NODEID.log
+
+#Launch nc
+eval "$CMD &>> \"$NCLOGS_DIR\"/$NODEID.log &"
 
 echo $!  # write PID of bg'ed script
 
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopAllNCs.sh b/genomix/genomix-driver/src/main/resources/scripts/stopAllNCs.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh b/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
old mode 100644
new mode 100755
index 41efa0f..ff9dfc4
--- a/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
@@ -17,7 +17,8 @@
   echo "Stopped CC on master: "`hostname`$'\t'$PID
 fi
 
-#Clean up CC temp dir
-rm -rf $CCTMP_DIR/*
+#Clean up CC temp dir but keep the default logs directory
+shopt -s extglob
+rm -rf $CCTMP_DIR/!(logs)
 
 
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh b/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
old mode 100644
new mode 100755
index 4007089..14df6ba
--- a/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
@@ -17,5 +17,6 @@
 	rm -rf $io_dir/*
 done
 
-#Clean up NC temp dir
-rm -rf $NCTMP_DIR/*
+#Clean up NC temp dir but keep the default logs directory
+shopt -s extglob
+rm -rf $NCTMP_DIR/!(logs)
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
index 39e204b..b3da280 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
@@ -167,7 +167,8 @@
 
         String ipAddress = jobConf.get(GenomixJobConf.IP_ADDRESS);
         int port = Integer.parseInt(jobConf.get(GenomixJobConf.PORT));
-        int numOfDuplicate = jobConf.getInt(GenomixJobConf.CORES_PER_MACHINE, 4);
+        String IODirs = jobConf.get(GenomixJobConf.HYRACKS_IO_DIRS, null);
+        int numOfDuplicate = IODirs != null ? IODirs.split(",").length : 4;
         boolean bProfiling = jobConf.getBoolean(GenomixJobConf.PROFILE, true);
         jobConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
         jobConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
index ffdb92e..56b59c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
@@ -80,6 +80,8 @@
 
     protected ConfFactory hadoopJobConfFactory;
     protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    private static final int DEFAULT_FRAME_LIMIT = 4096;
+    private static final int DEFAULT_FRAME_SIZE = 65535;
     protected String[] ncNodeNames;
     protected String[] readSchedule;
 
@@ -236,10 +238,10 @@
     protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
         Configuration conf = confFactory.getConf();
         kmerSize = Integer.parseInt(conf.get(GenomixJobConf.KMER_LENGTH));
-        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
-        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
-        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
-        System.out.println(GenomixJobConf.DEFAULT_FRAME_SIZE);
+        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, DEFAULT_FRAME_LIMIT);
+//        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, DEFAULT_FRAME_SIZE);
+        System.out.println(DEFAULT_FRAME_SIZE);
         System.out.println(frameSize);
         String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         groupbyType = GroupbyType.PRECLUSTER;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 97f1d5c..9edef85 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -76,6 +76,13 @@
         if (maxIteration < 0)
             maxIteration = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.GRAPH_CLEAN_MAX_ITERATIONS));
         GenomixJobConf.setGlobalStaticConstants(getContext().getConfiguration());
+
+        
+//        if (getSuperstep() == 1) {
+//            kmerSize = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.KMER_LENGTH));
+//            maxIteration = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.GRAPH_CLEAN_MAX_ITERATIONS));
+//            GenomixJobConf.setGlobalStaticConstants(getContext().getConfiguration());
+//        }
     }
     
     /**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 0c5a0e7..108bb2a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -47,8 +47,8 @@
         receivedMsgList.clear();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index b25b0a1..94a373e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -82,8 +82,8 @@
         }
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 47a6e04..66b98a8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -73,8 +73,8 @@
         tmpValue.reset();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
index 637b233..14c2d9d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
@@ -50,8 +50,8 @@
         }
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
index 83872be..04682cb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
@@ -44,8 +44,8 @@
         super.initVertex();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         if(getSuperstep() == 1)
             ScaffoldingAggregator.preScaffoldingMap.clear();
         else if(getSuperstep() == 2)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 553cefc..4c8bbec 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -97,8 +97,8 @@
             tmpOutgoingEdge = new EdgeWritable();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 0c8ebf5..56abdd4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -41,8 +41,8 @@
             destVertexId = new VKmerBytesWritable();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
index c345ac8..5b3e219 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
@@ -43,8 +43,8 @@
             repeatKmer = new VKmerBytesWritable();
         if(getSuperstep() == 1)
             StatisticsAggregator.preGlobalCounters.clear();
-        else
-            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+//        else
+//            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
         counters.clear();
         getVertexValue().getCounters().clear();
     }