Merge branch 'anbangx/fullstack_genomix' into genomix/fullstack_genomix
diff --git a/genomix/genomix-data/pom.xml b/genomix/genomix-data/pom.xml
index 18500cc..d164b0c 100644
--- a/genomix/genomix-data/pom.xml
+++ b/genomix/genomix-data/pom.xml
@@ -50,6 +50,11 @@
<type>maven-plugin</type>
</dependency>
<dependency>
+ <groupId>jfree</groupId>
+ <artifactId>jfreechart</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
index cfe4dfd..63758f2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
@@ -79,6 +79,11 @@
@Option(name = "-followsGraphBuild", usage = "whether or not the given input is output from a previous graph-build", required = false)
private boolean followsGraphBuild = false;
+ @Option(name = "-clusterWaitTime", usage = "the amount of time (in ms) to wait between starting/stopping CC/NC", required = false)
+ private int clusterWaitTime = -1;
+
+ @Option(name = "-drawStatistics", usage = "Plot coverage statistics after graphbuilding stages", required = false)
+ private boolean drawStatistics = false;
// Graph cleaning
@Option(name = "-bridgeRemove_maxLength", usage = "Nodes with length <= bridgeRemoveLength that bridge separate paths are removed from the graph", required = false)
@@ -115,9 +120,6 @@
@Option(name = "-profile", usage = "Whether or not to do runtime profifling", required = false)
private boolean profile = false;
- @Option(name = "-coresPerMachine", usage="the number of cores available in each machine", required=false)
- private int coresPerMachine = -1;
-
@Option(name = "-runLocal", usage = "Run a local instance using the Hadoop MiniCluster. NOTE: overrides settings for -ip and -port and those in conf/*.properties", required=false)
private boolean runLocal = false;
@@ -143,7 +145,6 @@
TIP_REMOVE,
SCAFFOLD,
SPLIT_REPEAT,
- STATS,
DUMP_FASTA;
/**
@@ -176,6 +177,8 @@
public static final String LOCAL_OUTPUT_DIR = "genomix.final.local.output.dir";
public static final String SAVE_INTERMEDIATE_RESULTS = "genomix.save.intermediate.results";
public static final String FOLLOWS_GRAPH_BUILD = "genomix.follows.graph.build";
+ public static final String CLUSTER_WAIT_TIME = "genomix.cluster.wait.time";
+ public static final String DRAW_STATISTICS = "genomix.draw.statistics";
// Graph cleaning
public static final String BRIDGE_REMOVE_MAX_LENGTH = "genomix.bridgeRemove.maxLength";
@@ -194,31 +197,11 @@
public static final String RUN_LOCAL = "genomix.runLocal";
// TODO should these also be command line options?
- public static final String CORES_PER_MACHINE = "genomix.driver.duplicate.num";
public static final String FRAME_SIZE = "genomix.framesize";
public static final String FRAME_LIMIT = "genomix.framelimit";
- public static final String TABLE_SIZE = "genomix.tablesize";
public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
public static final String OUTPUT_FORMAT = "genomix.graph.output";
- /** Configurations used by hybrid groupby function in graph build phrase */
- public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
- public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
- public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
- public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
- public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
-
- public static final int DEFAULT_FRAME_SIZE = 65536;
- public static final int DEFAULT_FRAME_LIMIT = 65536;
- public static final int DEFAULT_TABLE_SIZE = 10485767;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
- public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
-
- public static final String GROUPBY_TYPE_HYBRID = "hybrid";
- public static final String GROUPBY_TYPE_EXTERNAL = "external";
public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
@@ -227,16 +210,21 @@
public static final String OUTPUT_FORMAT_BINARY = "genomix.outputformat.binary";
public static final String OUTPUT_FORMAT_TEXT = "genomix.outputformat.text";
public static final String HDFS_WORK_PATH = "genomix.hdfs.work.path";
+ public static final String HYRACKS_IO_DIRS = "genomix.hyracks.IO_DIRS";
+ public static final String HYRACKS_SLAVES = "genomix.hyracks.slaves.list";
+
private static final Patterns[] DEFAULT_PIPELINE_ORDER = {
Patterns.BUILD, Patterns.MERGE,
Patterns.LOW_COVERAGE, Patterns.MERGE,
Patterns.TIP_REMOVE, Patterns.MERGE,
-// Patterns.BUBBLE, Patterns.MERGE,
-// Patterns.SPLIT_REPEAT, Patterns.MERGE,
-// Patterns.SCAFFOLD, Patterns.MERGE
+ Patterns.BUBBLE, Patterns.MERGE,
+ Patterns.SPLIT_REPEAT, Patterns.MERGE,
+ Patterns.SCAFFOLD, Patterns.MERGE
};
+
+
private String[] extraArguments = {};
private static Map<String, Long> tickTimes = new HashMap<String, Long>();
@@ -355,10 +343,13 @@
set(HDFS_WORK_PATH, "genomix_out"); // should be in the user's home directory?
// hyracks-specific
- if (getInt(CORES_PER_MACHINE, -1) == -1)
- setInt(CORES_PER_MACHINE, 4);
- if (getInt(FRAME_SIZE, -1) == -1)
- setInt(FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ if (getInt(CLUSTER_WAIT_TIME, -1) == -1)
+ setInt(CLUSTER_WAIT_TIME, 6000);
+
+ if (getBoolean(DRAW_STATISTICS, false))
+ setBoolean(DRAW_STATISTICS, true);
+ else
+ setBoolean(DRAW_STATISTICS, false);
// if (getBoolean(RUN_LOCAL, false)) {
// // override any other settings for HOST and PORT
@@ -389,12 +380,9 @@
set(HDFS_WORK_PATH, opts.hdfsWorkPath);
setBoolean(SAVE_INTERMEDIATE_RESULTS, opts.saveIntermediateResults);
setBoolean(FOLLOWS_GRAPH_BUILD, opts.followsGraphBuild);
+ setInt(CLUSTER_WAIT_TIME, opts.clusterWaitTime);
+ setBoolean(DRAW_STATISTICS, opts.drawStatistics);
-
-// if (opts.runLocal && (opts.ipAddress != null || opts.port != -1))
-// throw new IllegalArgumentException("Option -runLocal cannot be set at the same time as -port or -ip! (-runLocal starts a cluster; -ip and -port specify an existing cluster)");
- if (opts.runLocal)
- throw new IllegalArgumentException("runLocal is currently unsupported!");
setBoolean(RUN_LOCAL, opts.runLocal);
// Hyracks/Pregelix Setup
@@ -402,7 +390,6 @@
set(IP_ADDRESS, opts.ipAddress);
setInt(PORT, opts.port);
setBoolean(PROFILE, opts.profile);
- setInt(CORES_PER_MACHINE, opts.coresPerMachine);
// Graph cleaning
setInt(BRIDGE_REMOVE_MAX_LENGTH, opts.bridgeRemove_maxLength);
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java
new file mode 100644
index 0000000..20bfceb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/DriverUtils.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.minicluster;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.jfree.chart.ChartFactory;
+import org.jfree.chart.ChartUtilities;
+import org.jfree.chart.JFreeChart;
+import org.jfree.chart.plot.PlotOrientation;
+import org.jfree.data.xy.XYDataset;
+import org.jfree.data.xy.XYSeries;
+import org.jfree.data.xy.XYSeriesCollection;
+
+import edu.uci.ics.genomix.config.GenomixJobConf;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DriverUtils {
+
+ public static final Log LOG = LogFactory.getLog(DriverUtils.class);
+
+ /*
+ * Get the IP address of the master node using the bin/getip.sh script
+ */
+ public static String getIP(String hostName) throws IOException, InterruptedException {
+ String getIPCmd = "ssh -n " + hostName + " \"" + System.getProperty("app.home", ".") + File.separator + "bin"
+ + File.separator + "getip.sh\"";
+ Process p = Runtime.getRuntime().exec(getIPCmd);
+ p.waitFor(); // wait for ssh
+ String stdout = IOUtils.toString(p.getInputStream()).trim();
+ if (p.exitValue() != 0)
+ throw new RuntimeException("Failed to get the ip address of the master node! Script returned exit code: "
+ + p.exitValue() + "\nstdout: " + stdout + "\nstderr: " + IOUtils.toString(p.getErrorStream()));
+ return stdout;
+ }
+
+ /**
+ * set the CC's IP address and port from the cluster.properties and `getip.sh` script
+ */
+ public static void updateCCProperties(GenomixJobConf conf) throws FileNotFoundException, IOException, InterruptedException {
+ Properties CCProperties = new Properties();
+ CCProperties.load(new FileInputStream(System.getProperty("app.home", ".") + File.separator + "conf"
+ + File.separator + "cluster.properties"));
+ if (conf.get(GenomixJobConf.IP_ADDRESS) == null)
+ conf.set(GenomixJobConf.IP_ADDRESS, getIP("localhost"));
+ if (Integer.parseInt(conf.get(GenomixJobConf.PORT)) == -1) {
+ conf.set(GenomixJobConf.PORT, CCProperties.getProperty("CC_CLIENTPORT"));
+ }
+ if (conf.get(GenomixJobConf.FRAME_SIZE) == null)
+ conf.set(GenomixJobConf.FRAME_SIZE, CCProperties.getProperty("FRAME_SIZE"));
+ if (conf.get(GenomixJobConf.FRAME_LIMIT) == null)
+ conf.set(GenomixJobConf.FRAME_LIMIT, CCProperties.getProperty("FRAME_LIMIT"));
+ if (conf.get(GenomixJobConf.HYRACKS_IO_DIRS) == null)
+ conf.set(GenomixJobConf.HYRACKS_IO_DIRS, CCProperties.getProperty("IO_DIRS"));
+ if (conf.get(GenomixJobConf.HYRACKS_SLAVES) == null) {
+ String slaves = FileUtils.readFileToString(new File(System.getProperty("app.home", ".") + File.separator + "conf"
+ + File.separator + "slaves"));
+ conf.set(GenomixJobConf.HYRACKS_SLAVES, slaves);
+ }
+ }
+
+ public static void drawStatistics(JobConf conf, String inputStats, String outputChart) throws IOException {
+ LOG.info("Getting coverage statistics...");
+ GenomixJobConf.tick("drawStatistics");
+ FileSystem dfs = FileSystem.get(conf);
+
+ // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
+ SequenceFile.Reader reader = null;
+ VKmerBytesWritable key = null;
+ NodeWritable value = null;
+ TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
+ FileStatus[] files = dfs.globStatus(new Path(inputStats + File.separator + "*"));
+ for (FileStatus f : files) {
+ if (f.getLen() != 0) {
+ try {
+ reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+ key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ if (key == null || value == null)
+ break;
+ Integer cov = java.lang.Math.round(value.getAverageCoverage());
+ Long count = coverageCounts.get(cov);
+ if (count == null)
+ coverageCounts.put(cov, new Long(1));
+ else
+ coverageCounts.put(cov, count + 1);
+ }
+ } catch (Exception e) {
+ System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
+ } finally {
+ if (reader != null)
+ reader.close();
+ }
+ }
+ }
+
+ XYSeries series = new XYSeries("Kmer Coverage");
+ for (Entry<Integer, Long> pair : coverageCounts.entrySet()) {
+ series.add(pair.getKey().floatValue(), pair.getValue().longValue());
+ }
+ XYDataset xyDataset = new XYSeriesCollection(series);
+ JFreeChart chart = ChartFactory.createXYLineChart("Coverage per kmer in " + new File(inputStats).getName(),
+ "Coverage", "Count", xyDataset, PlotOrientation.VERTICAL, true, true, false);
+
+ // Write the data to the output stream:
+ FileOutputStream chartOut = new FileOutputStream(new File(outputChart));
+ ChartUtilities.writeChartAsPNG(chartOut, chart, 800, 600);
+ chartOut.flush();
+ chartOut.close();
+ LOG.info("Coverage took " + GenomixJobConf.tock("drawStatistics") + "ms");
+ }
+
+ public static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild)
+ throws IOException {
+ LOG.info("Dumping graph to fasta...");
+ GenomixJobConf.tick("dumpGraph");
+ FileSystem dfs = FileSystem.get(conf);
+
+ // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
+ SequenceFile.Reader reader = null;
+ VKmerBytesWritable key = null;
+ NodeWritable value = null;
+ BufferedWriter bw = null;
+ FileStatus[] files = dfs.globStatus(new Path(inputGraph + File.separator + "*"));
+ for (FileStatus f : files) {
+ if (f.getLen() != 0) {
+ try {
+ reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+ key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ if (bw == null)
+ bw = new BufferedWriter(new FileWriter(outputFasta));
+ while (reader.next(key, value)) {
+ if (key == null || value == null)
+ break;
+ bw.write(">node_" + key.toString() + "\n");
+ bw.write(followingBuild ? key.toString() : value.getInternalKmer().toString());
+ bw.newLine();
+ }
+ } catch (Exception e) {
+ System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
+ } finally {
+ if (reader != null)
+ reader.close();
+ }
+ }
+ }
+ if (bw != null)
+ bw.close();
+ LOG.info("Dump graph to fasta took " + GenomixJobConf.tock("dumpGraph") + "ms");
+ }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java
new file mode 100644
index 0000000..a3e1e0a
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixClusterManager.java
@@ -0,0 +1,393 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.minicluster;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.config.GenomixJobConf;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
+
+/**
+ *
+ *
+ */
+public class GenomixClusterManager {
+
+ public enum ClusterType {
+ HYRACKS,
+ PREGELIX,
+ HADOOP
+ }
+
+ private static final Log LOG = LogFactory.getLog(GenomixClusterManager.class);
+ public static final String LOCAL_HOSTNAME = "localhost";
+ public static final String LOCAL_IP = "127.0.0.1";
+ public static final int LOCAL_HYRACKS_CLIENT_PORT = 3099;
+ public static final int LOCAL_HYRACKS_CC_PORT = 1099;
+ public static final int LOCAL_PREGELIX_CLIENT_PORT = 3097;
+ public static final int LOCAL_PREGELIX_CC_PORT = 1097;
+
+ private ClusterControllerService localHyracksCC;
+ private NodeControllerService localHyracksNC;
+ private ClusterControllerService localPregelixCC;
+ private NodeControllerService localPregelixNC;
+ private MiniDFSCluster localDFSCluster;
+ private MiniMRCluster localMRCluster;
+
+ private final boolean runLocal;
+ private final GenomixJobConf conf;
+ private boolean jarsCopiedToHadoop = false;
+
+ private HashMap<ClusterType, Thread> shutdownHooks = new HashMap<ClusterType, Thread>();
+
+ public GenomixClusterManager(boolean runLocal, GenomixJobConf conf) {
+ this.runLocal = runLocal;
+ this.conf = conf;
+ }
+
+ /**
+ * Start a cluster of the given type. If runLocal is specified, we will create an in-memory version of the cluster.
+ */
+ public void startCluster(ClusterType clusterType) throws Exception {
+ addClusterShutdownHook(clusterType);
+ switch (clusterType) {
+ case HYRACKS:
+ case PREGELIX:
+ if (runLocal) {
+ startLocalCC(clusterType);
+ startLocalNC(clusterType);
+ } else {
+ int sleepms = Integer.parseInt(conf.get(GenomixJobConf.CLUSTER_WAIT_TIME));
+ startCC(sleepms);
+ startNCs(clusterType, sleepms);
+ }
+ break;
+ case HADOOP:
+ if (runLocal)
+ startLocalMRCluster();
+ else
+ deployJarsToHadoop();
+ break;
+ }
+ }
+
+ public void stopCluster(ClusterType clusterType) throws Exception {
+ switch (clusterType) {
+ case HYRACKS:
+ if (runLocal) {
+ if (localHyracksCC != null) {
+ localHyracksCC.stop();
+ localHyracksCC = null;
+ }
+ if (localHyracksNC != null) {
+ localHyracksNC.stop();
+ localHyracksNC = null;
+ }
+ } else {
+ shutdownCC();
+ shutdownNCs();
+ }
+ break;
+ case PREGELIX:
+ if (runLocal) {
+ if (localPregelixCC != null) {
+ localPregelixCC.stop();
+ localPregelixCC = null;
+ }
+ if (localPregelixNC != null) {
+ localPregelixNC.stop();
+ localPregelixNC = null;
+
+ }
+ } else {
+ shutdownCC();
+ shutdownNCs();
+ }
+ break;
+ case HADOOP:
+ if (runLocal) {
+ if (localMRCluster != null) {
+ localMRCluster.shutdown();
+ localMRCluster = null;
+ }
+ if (localDFSCluster != null) {
+ localDFSCluster.shutdown();
+ localDFSCluster = null;
+ }
+ }
+ break;
+ }
+ removeClusterShutdownHook(clusterType);
+ }
+
+ private void startLocalCC(ClusterType clusterType) throws Exception {
+ LOG.info("Starting local CC...");
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = LOCAL_HOSTNAME;
+ ccConfig.clusterNetIpAddress = LOCAL_HOSTNAME;
+ ccConfig.defaultMaxJobAttempts = 0;
+ ccConfig.jobHistorySize = 1;
+ ccConfig.profileDumpPeriod = -1;
+
+ if (clusterType == ClusterType.HYRACKS) {
+ ccConfig.clusterNetPort = LOCAL_HYRACKS_CC_PORT;
+ ccConfig.clientNetPort = LOCAL_HYRACKS_CLIENT_PORT;
+ localHyracksCC = new ClusterControllerService(ccConfig);
+ localHyracksCC.start();
+ } else if (clusterType == ClusterType.PREGELIX) {
+ ccConfig.clusterNetPort = LOCAL_PREGELIX_CC_PORT;
+ ccConfig.clientNetPort = LOCAL_PREGELIX_CLIENT_PORT;
+ localPregelixCC = new ClusterControllerService(ccConfig);
+ localPregelixCC.start();
+ } else {
+ throw new IllegalArgumentException("Invalid CC type: " + clusterType);
+ }
+ }
+
+ private void startLocalNC(ClusterType clusterType) throws Exception {
+ LOG.info("Starting local NC...");
+ // ClusterConfig.setClusterPropertiesPath(System.getProperty("app.home") + "/conf/cluster.properties");
+ // ClusterConfig.setStorePath(...);
+ NCConfig ncConfig = new NCConfig();
+ ncConfig.ccHost = LOCAL_HOSTNAME;
+ ncConfig.clusterNetIPAddress = LOCAL_HOSTNAME;
+ ncConfig.dataIPAddress = LOCAL_IP;
+ ncConfig.datasetIPAddress = LOCAL_IP;
+ ncConfig.nodeId = "nc-" + clusterType;
+ ncConfig.ioDevices = "tmp" + File.separator + "t3" + File.separator + clusterType;
+
+ if (clusterType == ClusterType.HYRACKS) {
+ ncConfig.ccPort = LOCAL_HYRACKS_CC_PORT;
+ localHyracksNC = new NodeControllerService(ncConfig);
+ localHyracksNC.start();
+ } else if (clusterType == ClusterType.PREGELIX) {
+ ncConfig.ccPort = LOCAL_PREGELIX_CC_PORT;
+ ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
+ localPregelixNC = new NodeControllerService(ncConfig);
+ localPregelixNC.start();
+ } else {
+ throw new IllegalArgumentException("Invalid NC type: " + clusterType);
+ }
+ }
+
+ private void startLocalMRCluster() throws IOException {
+ LOG.info("Starting local DFS and MR cluster...");
+ localDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+ localMRCluster = new MiniMRCluster(1, localDFSCluster.getFileSystem().getUri().toString(), 1);
+ }
+
+ /**
+ * Walk the current CLASSPATH to get all jar's in use and copy them up to all HDFS nodes
+ *
+ * @throws IOException
+ */
+ private void deployJarsToHadoop() throws IOException {
+ if (!jarsCopiedToHadoop) {
+ LOG.info("Deploying jars in my classpath to HDFS Distributed Cache...");
+ FileSystem dfs = FileSystem.get(conf);
+ String[] classPath = { System.getenv().get("CLASSPATH"), System.getProperty("java.class.path") };
+ for (String cp : classPath) {
+ if (cp == null)
+ continue;
+ for (String item : cp.split(":")) {
+ // LOG.info("Checking " + item);
+ if (item.endsWith(".jar")) {
+ // LOG.info("Deploying " + item);
+ Path localJar = new Path(item);
+ Path jarDestDir = new Path(conf.get(GenomixJobConf.HDFS_WORK_PATH) + "/jar-dependencies");
+ // dist cache requires absolute paths. we have to use the working directory if HDFS_WORK_PATH is relative
+ if (!jarDestDir.isAbsolute()) {
+ // working dir is the correct base, but we must use the path version (not a URI). Get URI and strip out leading identifiers
+ String hostNameRE = "([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*";
+ String[] workDirs = dfs.getWorkingDirectory().toString()
+ .split("(hdfs://" + hostNameRE + ":\\d+|file:)", 2);
+ if (workDirs.length <= 1) {
+ LOG.info("Weird.... didn't find a URI header matching hdfs://host:port or file: Just using the original instead.");
+ jarDestDir = new Path(dfs.getWorkingDirectory() + File.separator + jarDestDir);
+ } else {
+ jarDestDir = new Path(workDirs[1] + File.separator + jarDestDir);
+ }
+ }
+ dfs.mkdirs(jarDestDir);
+ Path destJar = new Path(jarDestDir + File.separator + localJar.getName());
+ dfs.copyFromLocalFile(localJar, destJar);
+ // LOG.info("Jar in distributed cache: " + destJar);
+ DistributedCache.addFileToClassPath(destJar, conf);
+ }
+ }
+ }
+ }
+ }
+
+ private static void startNCs(ClusterType type, int sleepms) throws IOException, InterruptedException {
+ LOG.info("Starting NC's");
+ String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+ + "startAllNCs.sh " + type;
+ Process p = Runtime.getRuntime().exec(startNCCmd);
+ p.waitFor(); // wait for ssh
+ Thread.sleep(sleepms); // wait for NC -> CC registration
+ if (p.exitValue() != 0)
+ throw new RuntimeException("Failed to start the" + type + " NC's! Script returned exit code: "
+ + p.exitValue() + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
+ + IOUtils.toString(p.getErrorStream()));
+ }
+
+ private static void startCC(int sleepms) throws IOException, InterruptedException {
+ LOG.info("Starting CC");
+ String startCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+ + "startcc.sh";
+ Process p = Runtime.getRuntime().exec(startCCCmd);
+ p.waitFor(); // wait for cmd execution
+ Thread.sleep(sleepms); // wait for CC registration
+ if (p.exitValue() != 0)
+ throw new RuntimeException("Failed to start the genomix CC! Script returned exit code: " + p.exitValue()
+ + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
+ + IOUtils.toString(p.getErrorStream()));
+ }
+
+ private static void shutdownCC() throws IOException, InterruptedException {
+ LOG.info("Shutting down any previous CC");
+ String stopCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopcc.sh";
+ Process p = Runtime.getRuntime().exec(stopCCCmd);
+ p.waitFor(); // wait for cmd execution
+ }
+
+ private static void shutdownNCs() throws IOException, InterruptedException {
+ LOG.info("Shutting down any previous NC's");
+ String stopNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
+ + "stopAllNCs.sh";
+ Process p = Runtime.getRuntime().exec(stopNCCmd);
+ p.waitFor(); // wait for ssh
+ }
+
+ private void addClusterShutdownHook(final ClusterType clusterType) {
+ if (shutdownHooks.containsKey(clusterType))
+ throw new IllegalArgumentException("Already specified a hook for shutting down a " + clusterType
+ + " cluster! (Try removing the existing hook first?)");
+ Thread hook = new Thread() {
+ @Override
+ public void run() {
+ LOG.info("Shutting down the cluster...");
+ try {
+ stopCluster(clusterType);
+ } catch (Exception e) {
+ System.err.println("Error while shutting the cluster down:");
+ e.printStackTrace();
+ }
+ }
+ };
+ shutdownHooks.put(clusterType, hook);
+ Runtime.getRuntime().addShutdownHook(hook);
+ }
+
+ private void removeClusterShutdownHook(final ClusterType clusterType) {
+ if (!shutdownHooks.containsKey(clusterType))
+ // throw new IllegalArgumentException("There is no shutdown hook for " + clusterType + "!");
+ return; // ignore-- we are cleaning up after a previous run
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHooks.get(clusterType));
+ } catch (IllegalStateException e) {
+ // ignore: we must already be shutting down
+ }
+ }
+
+ public static void copyLocalToHDFS(JobConf conf, String localDir, String destDir) throws IOException {
+ LOG.info("Copying local directory " + localDir + " to HDFS: " + destDir);
+ GenomixJobConf.tick("copyLocalToHDFS");
+ FileSystem dfs = FileSystem.get(conf);
+ Path dest = new Path(destDir);
+ dfs.delete(dest, true);
+ dfs.mkdirs(dest);
+
+ File srcBase = new File(localDir);
+ if (srcBase.isDirectory())
+ for (File f : srcBase.listFiles())
+ dfs.copyFromLocalFile(new Path(f.toString()), dest);
+ else
+ dfs.copyFromLocalFile(new Path(localDir), dest);
+
+ LOG.info("Copy took " + GenomixJobConf.tock("copyLocalToHDFS") + "ms");
+ }
+
+ public static void copyBinToLocal(JobConf conf, String hdfsSrcDir, String localDestDir) throws IOException {
+ LOG.info("Copying HDFS directory " + hdfsSrcDir + " to local: " + localDestDir);
+ GenomixJobConf.tick("copyBinToLocal");
+ FileSystem dfs = FileSystem.get(conf);
+ FileUtils.deleteQuietly(new File(localDestDir));
+
+ // save original binary to output/bin
+ dfs.copyToLocalFile(new Path(hdfsSrcDir), new Path(localDestDir + File.separator + "bin"));
+
+ // convert hdfs sequence files to text as output/text
+ BufferedWriter bw = null;
+ SequenceFile.Reader reader = null;
+ Writable key = null;
+ Writable value = null;
+ FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + File.separator + "*"));
+ for (FileStatus f : files) {
+ if (f.getLen() != 0 && !f.isDir()) {
+ try {
+ reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+ key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ if (bw == null)
+ bw = new BufferedWriter(new FileWriter(localDestDir + File.separator + "data"));
+ while (reader.next(key, value)) {
+ if (key == null || value == null)
+ break;
+ bw.write(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ }
+ } catch (Exception e) {
+ System.out.println("Encountered an error copying " + f + " to local:\n" + e);
+ } finally {
+ if (reader != null)
+ reader.close();
+ }
+
+ }
+ }
+ if (bw != null)
+ bw.close();
+ LOG.info("Copy took " + GenomixJobConf.tock("copyBinToLocal") + "ms");
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java
deleted file mode 100644
index 28d6572..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/minicluster/GenomixMiniCluster.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package edu.uci.ics.genomix.minicluster;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.wicket.util.file.File;
-
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
-import edu.uci.ics.genomix.config.GenomixJobConf;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-
-public class GenomixMiniCluster {
-
- public static final String NC1_ID = "nc1";
- public static final int TEST_HYRACKS_CC_PORT = 1099;
- public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
- public static final String CC_HOST = "localhost";
-
- private static ClusterControllerService cc;
- private static NodeControllerService nc1;
- private static MiniDFSCluster dfsCluster;
- private static Path clusterProperties;
- private static Path clusterStores;
- private static Path clusterWorkDir;
- private static Path clusterStoresDir;
-
- public static void init(GenomixJobConf conf) throws Exception {
- makeLocalClusterConfig();
- ClusterConfig.setClusterPropertiesPath(clusterProperties.toAbsolutePath().toString());
- ClusterConfig.setStorePath(clusterStores.toAbsolutePath().toString());
-// dfsCluster = new MiniDFSCluster(conf, Integer.parseInt(conf.get(GenomixJobConf.CORES_PER_MACHINE)), true, null);
-
- // cluster controller
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = CC_HOST;
- ccConfig.clusterNetIpAddress = CC_HOST;
- ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
- ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
- ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 1;
- ccConfig.profileDumpPeriod = -1;
- cc = new ClusterControllerService(ccConfig);
- cc.start();
-
- // one node controller
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.clusterNetIPAddress = "localhost";
- ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- ncConfig1.ioDevices = clusterWorkDir.toString() + File.separator + "tmp" + File.separator + "t3";
- ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
- nc1 = new NodeControllerService(ncConfig1);
- nc1.start();
- }
-
- public static void deinit() throws Exception {
- nc1.stop();
- cc.stop();
-// dfsCluster.shutdown();
- FileUtils.deleteQuietly(new File("build"));
- FileUtils.deleteQuietly(clusterProperties.toFile());
- FileUtils.deleteQuietly(clusterStores.toFile());
- FileUtils.deleteQuietly(clusterWorkDir.toFile());
- FileUtils.deleteQuietly(clusterStoresDir.toFile());
- }
-
- /**
- * create the necessary .properties and directories for a minicluster instance
- */
- private static void makeLocalClusterConfig() throws IOException {
- clusterProperties = Files.createTempFile(FileSystems.getDefault().getPath("."), "tmp.cluster", ".properties");
- clusterWorkDir = Files.createTempDirectory(FileSystems.getDefault().getPath("."), "tmp.clusterWorkDir");
- PrintWriter writer = new PrintWriter(clusterProperties.toString(), "US-ASCII");
- writer.println("CC_CLIENTPORT=" + TEST_HYRACKS_CC_CLIENT_PORT);
- writer.println("CC_CLUSTERPORT=" + TEST_HYRACKS_CC_PORT);
- writer.println(String.format("WORKPATH=%s", clusterWorkDir.toAbsolutePath().toString()));
- writer.println("CCTMP_DIR=${WORKPATH}/tmp/t1");
- writer.println("NCTMP_DIR=${WORKPATH}/tmp/t2");
- writer.println("CCLOGS_DIR=$CCTMP_DIR/logs");
- writer.println("NCLOGS_DIR=$NCTMP_DIR/logs");
- writer.println("IO_DIRS=${WORKPATH}/tmp/t3");
- writer.println("JAVA_HOME=$JAVA_HOME");
- writer.println("CLASSPATH=\"${HADOOP_HOME}:${CLASSPATH}:.\"");
- writer.println("FRAME_SIZE=65536");
- writer.println("CCJAVA_OPTS=\"-Xmx1g -Djava.util.logging.config.file=logging.properties\"");
- writer.println("NCJAVA_OPTS=\"-Xmx1g -Djava.util.logging.config.file=logging.properties\"");
- writer.close();
-
- clusterStoresDir = Files.createTempDirectory(FileSystems.getDefault().getPath("."), "tmp.clusterStores");
- clusterStores = Files.createTempFile(FileSystems.getDefault().getPath("."), "tmp.stores", ".properties");
- writer = new PrintWriter(clusterStores.toString(), "US-ASCII");
- writer.println(String.format("store=%s", clusterStoresDir.toString()));
- writer.close();
- }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
index 758051d..968c492 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
@@ -331,7 +331,8 @@
for (EdgeWritable e : edges) {
VKmerBytesWritable key = e.getKey();
if (unionEdges.containsKey(key)) {
- unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+// unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+ unionEdges.get(key).unionUpdate(e.getReadIDs());
}
else {
unionEdges.put(key, new PositionListWritable(e.getReadIDs())); // make a new copy of their list
@@ -340,7 +341,8 @@
for (EdgeWritable e : other.edges) {
VKmerBytesWritable key = e.getKey();
if (unionEdges.containsKey(key)) {
- unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+// unionEdges.get(key).unionUpdateCappedCount(e.getReadIDs());
+ unionEdges.get(key).unionUpdate(e.getReadIDs());
}
else {
unionEdges.put(key, new PositionListWritable(e.getReadIDs())); // make a new copy of their list
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
deleted file mode 100644
index f3a4d6a..0000000
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
+++ /dev/null
@@ -1,316 +0,0 @@
-package edu.uci.ics.genomix.driver;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.jfree.chart.ChartFactory;
-import org.jfree.chart.ChartUtilities;
-import org.jfree.chart.JFreeChart;
-import org.jfree.chart.plot.PlotOrientation;
-import org.jfree.data.xy.XYDataset;
-import org.jfree.data.xy.XYSeries;
-import org.jfree.data.xy.XYSeriesCollection;
-
-import edu.uci.ics.genomix.config.GenomixJobConf;
-import edu.uci.ics.genomix.minicluster.GenomixMiniCluster;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class DriverUtils {
-
- enum NCTypes {
- HYRACKS,
- PREGELIX
- }
-
- private static final Log LOG = LogFactory.getLog(DriverUtils.class);
-
- /*
- * Get the IP address of the master node using the bin/getip.sh script
- */
- static String getIP(String hostName) throws IOException, InterruptedException {
- String getIPCmd = "ssh -n " + hostName + " \"" + System.getProperty("app.home", ".") + File.separator + "bin"
- + File.separator + "getip.sh\"";
- Process p = Runtime.getRuntime().exec(getIPCmd);
- p.waitFor(); // wait for ssh
- String stdout = IOUtils.toString(p.getInputStream()).trim();
- if (p.exitValue() != 0)
- throw new RuntimeException("Failed to get the ip address of the master node! Script returned exit code: "
- + p.exitValue() + "\nstdout: " + stdout + "\nstderr: " + IOUtils.toString(p.getErrorStream()));
- return stdout;
- }
-
- /**
- * set the CC's IP address and port from the cluster.properties and `getip.sh` script
- */
- static void updateCCProperties(GenomixJobConf conf) throws FileNotFoundException, IOException, InterruptedException {
- Properties CCProperties = new Properties();
- CCProperties.load(new FileInputStream(System.getProperty("app.home", ".") + File.separator + "conf"
- + File.separator + "cluster.properties"));
- if (conf.get(GenomixJobConf.IP_ADDRESS) == null)
- conf.set(GenomixJobConf.IP_ADDRESS, getIP("localhost"));
- if (Integer.parseInt(conf.get(GenomixJobConf.PORT)) == -1) {
- conf.set(GenomixJobConf.PORT, CCProperties.getProperty("CC_CLIENTPORT"));
- }
- if (conf.get(GenomixJobConf.FRAME_SIZE) == null)
- conf.set(GenomixJobConf.FRAME_SIZE,
- CCProperties.getProperty("FRAME_SIZE", String.valueOf(GenomixJobConf.DEFAULT_FRAME_SIZE)));
- }
-
- static void startNCs(NCTypes type) throws IOException {
- LOG.info("Starting NC's");
- String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
- + "startAllNCs.sh " + type;
- Process p = Runtime.getRuntime().exec(startNCCmd);
- try {
- p.waitFor(); // wait for ssh
- Thread.sleep(5000); // wait for NC -> CC registration
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (p.exitValue() != 0)
- throw new RuntimeException("Failed to start the" + type + " NC's! Script returned exit code: "
- + p.exitValue() + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
- + IOUtils.toString(p.getErrorStream()));
- }
-
- static void shutdownNCs() throws IOException {
- LOG.info("Shutting down any previous NC's");
- String stopNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
- + "stopAllNCs.sh";
- Process p = Runtime.getRuntime().exec(stopNCCmd);
- try {
- p.waitFor(); // wait for ssh
- // Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- static void startCC() throws IOException {
- LOG.info("Starting CC");
- String startCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
- + "startcc.sh";
- Process p = Runtime.getRuntime().exec(startCCCmd);
- try {
- p.waitFor(); // wait for cmd execution
- Thread.sleep(6000); // wait for CC registration
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (p.exitValue() != 0)
- throw new RuntimeException("Failed to start the genomix CC! Script returned exit code: " + p.exitValue()
- + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
- + IOUtils.toString(p.getErrorStream()));
- }
-
- static void shutdownCC() throws IOException {
- LOG.info("Shutting down CC");
- String stopCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopcc.sh";
- Process p = Runtime.getRuntime().exec(stopCCCmd);
- try {
- p.waitFor(); // wait for cmd execution
- // Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // if (p.exitValue() != 0)
- // throw new RuntimeException("Failed to stop the genomix CC! Script returned exit code: " + p.exitValue()
- // + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
- // + IOUtils.toString(p.getErrorStream()));
- }
-
- static void addClusterShutdownHook(final boolean runLocal) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- LOG.info("Shutting down the cluster");
- try {
- if (runLocal)
- GenomixMiniCluster.deinit();
- else {
- DriverUtils.shutdownNCs();
- DriverUtils.shutdownCC();
- }
- } catch (Exception e) {
- System.err.println("Error while shutting the cluster down:");
- e.printStackTrace();
- }
- }
- });
- }
-
- public static void copyLocalToHDFS(JobConf conf, String localDir, String destDir) throws IOException {
- LOG.info("Copying local directory " + localDir + " to HDFS: " + destDir);
- GenomixJobConf.tick("copyLocalToHDFS");
- FileSystem dfs = FileSystem.get(conf);
- Path dest = new Path(destDir);
- dfs.delete(dest, true);
- dfs.mkdirs(dest);
-
- File srcBase = new File(localDir);
- if (srcBase.isDirectory())
- for (File f : srcBase.listFiles())
- dfs.copyFromLocalFile(new Path(f.toString()), dest);
- else
- dfs.copyFromLocalFile(new Path(localDir), dest);
-
- LOG.info("Copy took " + GenomixJobConf.tock("copyLocalToHDFS") + "ms");
- }
-
- public static void copyBinToLocal(JobConf conf, String hdfsSrcDir, String localDestDir) throws IOException {
- LOG.info("Copying HDFS directory " + hdfsSrcDir + " to local: " + localDestDir);
- GenomixJobConf.tick("copyBinToLocal");
- FileSystem dfs = FileSystem.get(conf);
- FileUtils.deleteQuietly(new File(localDestDir));
-
- // save original binary to output/bin
- dfs.copyToLocalFile(new Path(hdfsSrcDir), new Path(localDestDir + File.separator + "bin"));
-
- // convert hdfs sequence files to text as output/text
- BufferedWriter bw = null;
- SequenceFile.Reader reader = null;
- Writable key = null;
- Writable value = null;
- FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + File.separator + "*"));
- for (FileStatus f : files) {
- if (f.getLen() != 0 && !f.isDir()) {
- try {
- reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
- key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- if (bw == null)
- bw = new BufferedWriter(new FileWriter(localDestDir + File.separator + "data"));
- while (reader.next(key, value)) {
- if (key == null || value == null)
- break;
- bw.write(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- } catch (Exception e) {
- System.out.println("Encountered an error copying " + f + " to local:\n" + e);
- } finally {
- if (reader != null)
- reader.close();
- }
-
- }
- }
- if (bw != null)
- bw.close();
- LOG.info("Copy took " + GenomixJobConf.tock("copyBinToLocal") + "ms");
- }
-
- static void drawStatistics(JobConf conf, String inputStats, String outputChart) throws IOException {
- LOG.info("Getting coverage statistics...");
- GenomixJobConf.tick("drawStatistics");
- FileSystem dfs = FileSystem.get(conf);
-
- // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
- SequenceFile.Reader reader = null;
- VKmerBytesWritable key = null;
- NodeWritable value = null;
- TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
- FileStatus[] files = dfs.globStatus(new Path(inputStats + File.separator + "*"));
- for (FileStatus f : files) {
- if (f.getLen() != 0) {
- try {
- reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
- key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- if (key == null || value == null)
- break;
- Integer cov = java.lang.Math.round(value.getAverageCoverage());
- Long count = coverageCounts.get(cov);
- if (count == null)
- coverageCounts.put(cov, new Long(1));
- else
- coverageCounts.put(cov, count + 1);
- }
- } catch (Exception e) {
- System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
- } finally {
- if (reader != null)
- reader.close();
- }
- }
- }
-
- XYSeries series = new XYSeries("Kmer Coverage");
- for (Entry<Integer, Long> pair : coverageCounts.entrySet()) {
- series.add(pair.getKey().floatValue(), pair.getValue().longValue());
- }
- XYDataset xyDataset = new XYSeriesCollection(series);
- JFreeChart chart = ChartFactory.createXYLineChart("Coverage per kmer in " + new File(inputStats).getName(),
- "Coverage", "Count", xyDataset, PlotOrientation.VERTICAL, true, true, false);
-
- // Write the data to the output stream:
- FileOutputStream chartOut = new FileOutputStream(new File(outputChart));
- ChartUtilities.writeChartAsPNG(chartOut, chart, 800, 600);
- chartOut.flush();
- chartOut.close();
- LOG.info("Coverage took " + GenomixJobConf.tock("drawStatistics") + "ms");
- }
-
- static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild)
- throws IOException {
- LOG.info("Dumping graph to fasta...");
- GenomixJobConf.tick("dumpGraph");
- FileSystem dfs = FileSystem.get(conf);
-
- // stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
- SequenceFile.Reader reader = null;
- VKmerBytesWritable key = null;
- NodeWritable value = null;
- BufferedWriter bw = null;
- FileStatus[] files = dfs.globStatus(new Path(inputGraph + File.separator + "*"));
- for (FileStatus f : files) {
- if (f.getLen() != 0) {
- try {
- reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
- key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- value = (NodeWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- if (bw == null)
- bw = new BufferedWriter(new FileWriter(outputFasta));
- while (reader.next(key, value)) {
- if (key == null || value == null)
- break;
- bw.write(">node_" + key.toString() + "\n");
- bw.write(followingBuild ? key.toString() : value.getInternalKmer().toString());
- bw.newLine();
- }
- } catch (Exception e) {
- System.out.println("Encountered an error getting stats for " + f + ":\n" + e);
- } finally {
- if (reader != null)
- reader.close();
- }
- }
- }
- if (bw != null)
- bw.close();
- LOG.info("Dump graph to fasta took " + GenomixJobConf.tock("dumpGraph") + "ms");
- }
-
-}
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
index adfae45..3312771 100644
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
+++ b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
@@ -35,9 +35,10 @@
import edu.uci.ics.genomix.config.GenomixJobConf;
import edu.uci.ics.genomix.config.GenomixJobConf.Patterns;
-import edu.uci.ics.genomix.driver.DriverUtils.NCTypes;
import edu.uci.ics.genomix.hyracks.graph.driver.Driver.Plan;
-import edu.uci.ics.genomix.minicluster.GenomixMiniCluster;
+import edu.uci.ics.genomix.minicluster.DriverUtils;
+import edu.uci.ics.genomix.minicluster.GenomixClusterManager;
+import edu.uci.ics.genomix.minicluster.GenomixClusterManager.ClusterType;
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -64,43 +65,50 @@
private String prevOutput;
private String curOutput;
private int stepNum;
- private List<PregelixJob> jobs;
+ private List<PregelixJob> pregelixJobs;
private boolean followingBuild = false; // need to adapt the graph immediately after building
+ private boolean runLocal;
+ private int numCoresPerMachine;
+ private int numMachines;
+ private GenomixClusterManager manager;
private edu.uci.ics.genomix.hyracks.graph.driver.Driver hyracksDriver;
private edu.uci.ics.pregelix.core.driver.Driver pregelixDriver;
- private void buildGraphWithHyracks(GenomixJobConf conf) throws NumberFormatException, IOException {
- DriverUtils.shutdownNCs();
- DriverUtils.shutdownCC();
- DriverUtils.startCC();
- DriverUtils.startNCs(NCTypes.HYRACKS);
+ private void buildGraphWithHyracks(GenomixJobConf conf) throws Exception {
LOG.info("Building Graph using Hyracks...");
+ manager.startCluster(ClusterType.HYRACKS);
GenomixJobConf.tick("buildGraphWithHyracks");
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- hyracksDriver = new edu.uci.ics.genomix.hyracks.graph.driver.Driver(conf.get(GenomixJobConf.IP_ADDRESS),
- Integer.parseInt(conf.get(GenomixJobConf.PORT)), Integer.parseInt(conf
- .get(GenomixJobConf.CORES_PER_MACHINE)));
+ String hyracksIP = runLocal ? GenomixClusterManager.LOCAL_IP : conf.get(GenomixJobConf.IP_ADDRESS);
+ int hyracksPort = runLocal ? GenomixClusterManager.LOCAL_HYRACKS_CLIENT_PORT : Integer.parseInt(conf.get(GenomixJobConf.PORT));
+ hyracksDriver = new edu.uci.ics.genomix.hyracks.graph.driver.Driver(hyracksIP, hyracksPort, numCoresPerMachine);
hyracksDriver.runJob(conf, Plan.BUILD_UNMERGED_GRAPH, Boolean.parseBoolean(conf.get(GenomixJobConf.PROFILE)));
followingBuild = true;
+ manager.stopCluster(ClusterType.HYRACKS);
LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHyracks") + "ms");
+ if (Boolean.parseBoolean(conf.get(GenomixJobConf.DRAW_STATISTICS)))
+ DriverUtils.drawStatistics(conf, curOutput, new Path(curOutput).getName() + ".coverage.png");
}
- private void buildGraphWithHadoop(GenomixJobConf conf) throws IOException {
+ private void buildGraphWithHadoop(GenomixJobConf conf) throws Exception {
LOG.info("Building Graph using Hadoop...");
+ manager.startCluster(ClusterType.HADOOP);
GenomixJobConf.tick("buildGraphWithHadoop");
DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF)));
conf.writeXml(confOutput);
confOutput.close();
- // TODO copy the jars up to DFS
edu.uci.ics.genomix.hadoop.contrailgraphbuilding.GenomixDriver hadoopDriver = new edu.uci.ics.genomix.hadoop.contrailgraphbuilding.GenomixDriver();
- hadoopDriver.run(prevOutput, curOutput, Integer.parseInt(conf.get(GenomixJobConf.CORES_PER_MACHINE)),
+ hadoopDriver.run(prevOutput, curOutput, numCoresPerMachine * numMachines,
Integer.parseInt(conf.get(GenomixJobConf.KMER_LENGTH)), 4 * 100000, true, HADOOP_CONF);
FileUtils.deleteQuietly(new File(HADOOP_CONF));
System.out.println("Finished job Hadoop-Build-Graph");
followingBuild = true;
- LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHadoop"));
+ manager.stopCluster(ClusterType.HADOOP);
+ LOG.info("Building the graph took " + GenomixJobConf.tock("buildGraphWithHadoop") + "ms");
+ if (Boolean.parseBoolean(conf.get(GenomixJobConf.DRAW_STATISTICS)))
+ DriverUtils.drawStatistics(conf, curOutput, new Path(curOutput).getName() + ".coverage.png");
}
@SuppressWarnings("deprecation")
@@ -114,27 +122,30 @@
private void addJob(PregelixJob job) {
if (followingBuild)
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
- jobs.add(job);
+ pregelixJobs.add(job);
followingBuild = false;
}
public void runGenomix(GenomixJobConf conf) throws NumberFormatException, HyracksException, Exception {
+ LOG.info("Starting Genomix Assembler Pipeline...");
+ GenomixJobConf.tick("runGenomix");
+
DriverUtils.updateCCProperties(conf);
+ numCoresPerMachine = conf.get(GenomixJobConf.HYRACKS_IO_DIRS).split(",").length;
+ numMachines = conf.get(GenomixJobConf.HYRACKS_SLAVES).split("\r?\n|\r").length; // split on newlines
GenomixJobConf.setGlobalStaticConstants(conf);
followingBuild = Boolean.parseBoolean(conf.get(GenomixJobConf.FOLLOWS_GRAPH_BUILD));
- jobs = new ArrayList<PregelixJob>();
+ pregelixJobs = new ArrayList<PregelixJob>();
stepNum = 0;
- boolean dump = false;
- final boolean runLocal = Boolean.parseBoolean(conf.get(GenomixJobConf.RUN_LOCAL));
- if (runLocal)
- GenomixMiniCluster.init(conf);
- DriverUtils.addClusterShutdownHook(runLocal); // *always* shut down any CCs or NCs we start
+ runLocal = Boolean.parseBoolean(conf.get(GenomixJobConf.RUN_LOCAL));
+ manager = new GenomixClusterManager(runLocal, conf);
+ manager.stopCluster(ClusterType.HYRACKS); // shut down any existing NCs and CCs
String localInput = conf.get(GenomixJobConf.LOCAL_INPUT_DIR);
if (localInput != null) {
conf.set(GenomixJobConf.INITIAL_INPUT_DIR, conf.get(GenomixJobConf.HDFS_WORK_PATH) + File.separator
+ "00-initial-input-from-genomix-driver");
- DriverUtils.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
+ GenomixClusterManager.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
}
curOutput = conf.get(GenomixJobConf.INITIAL_INPUT_DIR);
@@ -193,53 +204,51 @@
setOutput(conf, Patterns.SCAFFOLD);
addJob(ScaffoldingVertex.getConfiguredJob(conf, ScaffoldingVertex.class));
break;
- case STATS:
- DriverUtils.drawStatistics(conf, curOutput, "coverage.png");
- break;
case DUMP_FASTA:
- dump = true;
+ DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
break;
}
}
- if (jobs.size() > 0) {
- DriverUtils.shutdownNCs();
- DriverUtils.shutdownCC();
- DriverUtils.startCC();
- DriverUtils.startNCs(NCTypes.PREGELIX);
+ if (pregelixJobs.size() > 0) {
+ manager.startCluster(ClusterType.PREGELIX);
pregelixDriver = new edu.uci.ics.pregelix.core.driver.Driver(this.getClass());
- }
- // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
- // this would let them resume at arbitrary points of the pipeline
- if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
- for (int i = 0; i < jobs.size(); i++) {
- LOG.info("Starting job " + jobs.get(i).getJobName());
- GenomixJobConf.tick("pregelix-job");
-
- pregelixDriver.runJob(jobs.get(i), conf.get(GenomixJobConf.IP_ADDRESS),
- Integer.parseInt(conf.get(GenomixJobConf.PORT)));
-
- LOG.info("Finished job " + jobs.get(i).getJobName() + " in " + GenomixJobConf.tock("pregelix-job"));
+ String pregelixIP = runLocal ? GenomixClusterManager.LOCAL_IP : conf.get(GenomixJobConf.IP_ADDRESS);
+ int pregelixPort = runLocal ? GenomixClusterManager.LOCAL_PREGELIX_CLIENT_PORT : Integer.parseInt(conf.get(GenomixJobConf.PORT));
+ // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
+ // this would let them resume at arbitrary points of the pipeline
+ if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
+ LOG.info("Starting pregelix job series (saving intermediate results)...");
+ GenomixJobConf.tick("pregelix-runJob-one-by-one");
+ for (int i = 0; i < pregelixJobs.size(); i++) {
+ LOG.info("Starting job " + pregelixJobs.get(i).getJobName());
+ GenomixJobConf.tick("pregelix-job");
+ pregelixDriver.runJob(pregelixJobs.get(i), pregelixIP, pregelixPort);
+ LOG.info("Finished job " + pregelixJobs.get(i).getJobName() + " in "
+ + GenomixJobConf.tock("pregelix-job"));
+ }
+ LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJob-one-by-one"));
+ } else {
+ LOG.info("Starting pregelix job series (not saving intermediate results...");
+ GenomixJobConf.tick("pregelix-runJobs");
+ pregelixDriver.runJobs(pregelixJobs, pregelixIP, pregelixPort);
+ LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
}
- } else {
- LOG.info("Starting pregelix job series...");
- GenomixJobConf.tick("pregelix-runJobs");
- pregelixDriver.runJobs(jobs, conf.get(GenomixJobConf.IP_ADDRESS),
- Integer.parseInt(conf.get(GenomixJobConf.PORT)));
- LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
+ manager.stopCluster(ClusterType.PREGELIX);
}
if (conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR) != null)
- DriverUtils.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
- if (dump)
- DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
+ GenomixClusterManager.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
if (conf.get(GenomixJobConf.FINAL_OUTPUT_DIR) != null)
FileSystem.get(conf).rename(new Path(curOutput), new Path(GenomixJobConf.FINAL_OUTPUT_DIR));
-
+
+ LOG.info("Finished the Genomix Assembler Pipeline in " + GenomixJobConf.tock("runGenomix") + "ms!");
}
public static void main(String[] args) throws CmdLineException, NumberFormatException, HyracksException, Exception {
- String[] myArgs = { "-kmerLength", "5", "-coresPerMachine", "2",
+ String[] myArgs = {
+ "-runLocal", "true",
+ "-kmerLength", "5",
// "-saveIntermediateResults", "true",
// "-localInput", "../genomix-pregelix/data/input/reads/synthetic/",
"-localInput", "../genomix-pregelix/data/input/reads/pathmerge",
@@ -251,14 +260,16 @@
// "-pipelineOrder", "BUILD,MERGE",
// "-inputDir", "/home/wbiesing/code/hyracks/genomix/genomix-driver/graphbuild.binmerge",
// "-localInput", "../genomix-pregelix/data/TestSet/PathMerge/CyclePath/bin/part-00000",
- "-pipelineOrder", "BUILD_HYRACKS,MERGE" };
+ "-pipelineOrder", "MERGE" };
+ // allow Eclipse to run the maven-generated scripts
if (System.getProperty("app.home") == null)
- System.setProperty("app.home", "/home/wbiesing/code/hyracks/genomix/genomix-driver/target/appassembler");
+ System.setProperty("app.home", new File("target/appassembler").getAbsolutePath());
// Patterns.BUILD, Patterns.MERGE,
// Patterns.TIP_REMOVE, Patterns.MERGE,
// Patterns.BUBBLE, Patterns.MERGE,
- GenomixJobConf conf = GenomixJobConf.fromArguments(args);
+// GenomixJobConf conf = GenomixJobConf.fromArguments(args);
+ GenomixJobConf conf = GenomixJobConf.fromArguments(args);
GenomixDriver driver = new GenomixDriver();
driver.runGenomix(conf);
}
diff --git a/genomix/genomix-driver/src/main/resources/conf/cluster.properties b/genomix/genomix-driver/src/main/resources/conf/cluster.properties
index 66251be..57aa1fa 100644
--- a/genomix/genomix-driver/src/main/resources/conf/cluster.properties
+++ b/genomix/genomix-driver/src/main/resources/conf/cluster.properties
@@ -1,11 +1,14 @@
#The CC port for Hyracks clients
+# don't change unless you want multiple CCs on one machine
CC_CLIENTPORT=3099
#The CC port for Hyracks cluster management
+# don't change unless you want multiple CCs on one machine
CC_CLUSTERPORT=1099
-#The directory of hyracks binaries
-HYRACKS_HOME="../../../../hyracks"
+#Sets the http port for the Cluster Controller (default: 16001)
+# don't change unless you want multiple CCs on one machine
+CC_HTTPPORT=16001
WORKPATH=""
#The tmp directory for cc to install jars
@@ -30,12 +33,18 @@
CLASSPATH="${HADOOP_HOME}:${CLASSPATH}:."
#The frame size of the internal dataflow engine
-FRAME_SIZE=65536
+FRAME_SIZE=65535
+
+#The frame limit of the internal dataflow engine
+FRAME_LIMIT=4096
+
+#The number of jobs to keep logs for
+JOB_HISTORY_SIZE=50
#CC JAVA_OPTS
-CCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+CCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx5g -Djava.util.logging.config.file=conf/logging.properties"
# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
#NC JAVA_OPTS
-NCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx10g -Djava.util.logging.config.file=logging.properties"
+NCJAVA_OPTS="-Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx10g -Djava.util.logging.config.file=conf/logging.properties"
diff --git a/genomix/genomix-driver/src/main/resources/conf/logging.properties b/genomix/genomix-driver/src/main/resources/conf/logging.properties
new file mode 100644
index 0000000..a0f9d77
--- /dev/null
+++ b/genomix/genomix-driver/src/main/resources/conf/logging.properties
@@ -0,0 +1,68 @@
+############################################################
+# Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+# Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler
+# classes. These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+
+#handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers. For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+
+#.level= SEVERE
+ .level= INFO
+# .level= FINE
+# .level = FINEST
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+
+# java.util.logging.FileHandler.pattern = %h/java%u.log
+# java.util.logging.FileHandler.limit = 50000
+# java.util.logging.FileHandler.count = 1
+# java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+# java.util.logging.FileHandler.formatter = java.util.logging.SimpleFormatter
+
+# Limit the message that are printed on the console to FINE and above.
+
+java.util.logging.ConsoleHandler.level = FINEST
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
+# For example, set the com.xyz.foo logger to only log SEVERE
+# messages:
+
+#edu.uci.ics.genomix.pregelix = INFO
+#edu.uci.ics.asterix.level = FINE
+#edu.uci.ics.algebricks.level = FINE
+#edu.uci.ics.hyracks.level = SEVERE
+#edu.uci.ics.hyracks.control.nc.net.level = FINE
diff --git a/genomix/genomix-driver/src/main/resources/scripts/getip.sh b/genomix/genomix-driver/src/main/resources/scripts/getip.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh b/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh
new file mode 100755
index 0000000..83a2fd5
--- /dev/null
+++ b/genomix/genomix-driver/src/main/resources/scripts/makeScratchDirs.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+set -x
+
+GENOMIX_HOME="$( dirname "$( cd "$(dirname "$0")" ; pwd -P )" )" # script's parent dir's parent
+cd "$GENOMIX_HOME"
+
+. conf/cluster.properties
+. conf/stores.properties
+
+for i in `cat conf/slaves`
+do
+ DIRS=`echo $store | tr "," " "`
+ # ssh to the slave machine and capture its hostname and
+ ssh -n $i "mkdir -p $DIRS"
+
+ DIRS=`echo $IO_DIRS | tr "," " "`
+ ssh -n $i "mkdir -p $DIRS"
+done
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startAllNCs.sh b/genomix/genomix-driver/src/main/resources/scripts/startAllNCs.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startDebugNc.sh b/genomix/genomix-driver/src/main/resources/scripts/startDebugNc.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startcc.sh b/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
old mode 100644
new mode 100755
index cf7a0b4..e94ad17
--- a/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/startcc.sh
@@ -14,7 +14,7 @@
. conf/cluster.properties
#Get the IP address of the cc
CCHOST_NAME=`cat conf/master`
-CCHOST=`bin/getip.sh`
+CCHOST=`ssh -n ${CCHOST_NAME} "${GENOMIX_HOME}/bin/getip.sh"`
#Remove the temp dir
#rm -rf $CCTMP_DIR
@@ -29,15 +29,30 @@
export JAVA_OPTS=$CCJAVA_OPTS
cd $CCTMP_DIR
-#Launch hyracks cc script
+#Prepare cc script
+CMD="\"${GENOMIX_HOME}/bin/genomixcc\" -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST"
+
+if [ -n "$CC_CLIENTPORT" ]; then
+ CMD="$CMD -client-net-port $CC_CLIENTPORT"
+fi
+if [ -n "$CC_CLUSTERPORT" ]; then
+ CMD="$CMD -cluster-net-port $CC_CLUSTERPORT"
+fi
+if [ -n "$CC_HTTPPORT" ]; then
+ CMD="$CMD -http-port $CC_HTTPPORT"
+fi
+if [ -n "$JOB_HISTORY_SIZE" ]; then
+ CMD="$CMD -job-history-size $JOB_HISTORY_SIZE"
+fi
if [ -f "${GENOMIX_HOME}/conf/topology.xml" ]; then
-#Launch hyracks cc script with topology
-"${GENOMIX_HOME}"/bin/genomixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "${GENOMIX_HOME}/conf/topology.xml" &> "$CCLOGS_DIR"/cc.log &
-else
-#Launch hyracks cc script without toplogy
-"${GENOMIX_HOME}"/bin/genomixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> "$CCLOGS_DIR"/cc.log &
+CMD="$CMD -cluster-topology \"${GENOMIX_HOME}/conf/topology.xml\""
fi
+#Launch cc script
+printf "\n\n\n********************************************\nStarting CC with command %s\n\n" "$CMD" >> "$CCLOGS_DIR"/cc.log
+eval "$CMD >>\"$CCLOGS_DIR/cc.log\" 2>&1 &"
+
+# save the PID of the process we just launched
PID=$!
echo "master: "`hostname`$'\t'$PID
-echo $PID > "$GENOMIX_HOME"/conf/cc.pid
+echo $PID > "$GENOMIX_HOME/conf/cc.pid"
diff --git a/genomix/genomix-driver/src/main/resources/scripts/startnc.sh b/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
old mode 100644
new mode 100755
index 448b17d..02d8f97
--- a/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/startnc.sh
@@ -57,8 +57,12 @@
exit 1
fi
-#Launch hyracks nc
-"${GENOMIX_HOME}"/bin/$NCTYPE -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> "$NCLOGS_DIR"/$NODEID.log &
+CMD="\"${GENOMIX_HOME}/bin/$NCTYPE\" -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices \"${IO_DIRS}\""
+
+printf "\n\n\n********************************************\nStarting NC with command %s\n\n" "$CMD" >> "$NCLOGS_DIR"/$NODEID.log
+
+#Launch nc
+eval "$CMD >> \"$NCLOGS_DIR/$NODEID.log\" 2>&1 &"
echo $! # write PID of bg'ed script
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopAllNCs.sh b/genomix/genomix-driver/src/main/resources/scripts/stopAllNCs.sh
old mode 100644
new mode 100755
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh b/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
old mode 100644
new mode 100755
index 41efa0f..ff9dfc4
--- a/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/stopcc.sh
@@ -17,7 +17,8 @@
echo "Stopped CC on master: "`hostname`$'\t'$PID
fi
-#Clean up CC temp dir
-rm -rf $CCTMP_DIR/*
+#Clean up CC temp dir but keep the default logs directory
+shopt -s extglob
+rm -rf $CCTMP_DIR/!(logs)
diff --git a/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh b/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
old mode 100644
new mode 100755
index 4007089..14df6ba
--- a/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
+++ b/genomix/genomix-driver/src/main/resources/scripts/stopnc.sh
@@ -17,5 +17,6 @@
rm -rf $io_dir/*
done
-#Clean up NC temp dir
-rm -rf $NCTMP_DIR/*
+#Clean up NC temp dir but keep the default logs directory
+shopt -s extglob
+rm -rf $NCTMP_DIR/!(logs)
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
index 960a342..0c7c61b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/driver/Driver.java
@@ -159,7 +159,8 @@
String ipAddress = jobConf.get(GenomixJobConf.IP_ADDRESS);
int port = Integer.parseInt(jobConf.get(GenomixJobConf.PORT));
- int numOfDuplicate = jobConf.getInt(GenomixJobConf.CORES_PER_MACHINE, 4);
+ String IODirs = jobConf.get(GenomixJobConf.HYRACKS_IO_DIRS, null);
+ int numOfDuplicate = IODirs != null ? IODirs.split(",").length : 4;
boolean bProfiling = jobConf.getBoolean(GenomixJobConf.PROFILE, true);
jobConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
jobConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
index ffdb92e..56b59c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/graph/job/JobGenBrujinGraph.java
@@ -80,6 +80,8 @@
protected ConfFactory hadoopJobConfFactory;
protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ private static final int DEFAULT_FRAME_LIMIT = 4096;
+ private static final int DEFAULT_FRAME_SIZE = 65535;
protected String[] ncNodeNames;
protected String[] readSchedule;
@@ -236,10 +238,10 @@
protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
Configuration conf = confFactory.getConf();
kmerSize = Integer.parseInt(conf.get(GenomixJobConf.KMER_LENGTH));
- frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
- tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
- frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
- System.out.println(GenomixJobConf.DEFAULT_FRAME_SIZE);
+ frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, DEFAULT_FRAME_LIMIT);
+// tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ System.out.println(DEFAULT_FRAME_SIZE);
System.out.println(frameSize);
String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
groupbyType = GroupbyType.PRECLUSTER;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 85ecf71..b2c3767 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -74,6 +74,13 @@
if (maxIteration < 0)
maxIteration = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.GRAPH_CLEAN_MAX_ITERATIONS));
GenomixJobConf.setGlobalStaticConstants(getContext().getConfiguration());
+
+
+// if (getSuperstep() == 1) {
+// kmerSize = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.KMER_LENGTH));
+// maxIteration = Integer.parseInt(getContext().getConfiguration().get(GenomixJobConf.GRAPH_CLEAN_MAX_ITERATIONS));
+// GenomixJobConf.setGlobalStaticConstants(getContext().getConfiguration());
+// }
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 3520914..8678e9e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -47,8 +47,8 @@
receivedMsgList.clear();
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 42dd1e1..482306a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -82,8 +82,8 @@
}
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index aecd546..3c9ac3d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -69,8 +69,8 @@
tmpValue.reset();
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
index b7005f9..c796432 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
@@ -50,8 +50,8 @@
}
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
index dfe76a9..9782637 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/scaffolding/ScaffoldingVertex.java
@@ -44,8 +44,8 @@
super.initVertex();
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
if(getSuperstep() == 1)
ScaffoldingAggregator.preScaffoldingMap.clear();
else if(getSuperstep() == 2)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 90aa571..a075844 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -97,8 +97,8 @@
tmpOutgoingEdge = new EdgeWritable();
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index eebaab1..679a472 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -41,8 +41,8 @@
destVertexId = new VKmerBytesWritable();
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
index 36a2e54..de2be35 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
@@ -43,8 +43,8 @@
repeatKmer = new VKmerBytesWritable();
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
- else
- StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+// else
+// StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
counters.clear();
getVertexValue().getCounters().clear();
}