Add some logging and attempt to shut down servers on exception
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
index 35b510d..7955b72 100644
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
+++ b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
@@ -88,9 +88,11 @@
}
private void startNCs(NCTypes type) throws IOException {
+ LOG.info("Starting NC's");
shutdownNCs();
curNC = type;
String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "startAllNCs.sh " + type;
+ System.out.println(startNCCmd);
Process p = Runtime.getRuntime().exec(startNCCmd);
try {
p.waitFor(); // wait for ssh
@@ -103,12 +105,14 @@
}
private void shutdownNCs() throws IOException {
+ LOG.info("Shutting down NC's");
if (curNC == null) // nothing started yet
return;
switch(curNC) {
case HYRACKS:
case PREGELIX:
String stopNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopAllNCs.sh " + curNC;
+ System.out.println(stopNCCmd);
Process p = Runtime.getRuntime().exec(stopNCCmd);
try {
p.waitFor(); // wait for ssh
@@ -123,6 +127,7 @@
}
private void startCC() throws IOException {
+ LOG.info("Starting CC");
String startCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "startcc.sh";
Process p = Runtime.getRuntime().exec(startCCCmd);
try {
@@ -136,6 +141,7 @@
}
private void shutdownCC() throws IOException {
+ LOG.info("Shutting down CC");
String stopCCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator + "stopcc.sh";
Process p = Runtime.getRuntime().exec(stopCCCmd);
try {
@@ -183,117 +189,127 @@
stepNum = 0;
boolean dump = false;
boolean runLocal = Boolean.parseBoolean(conf.get(GenomixJobConf.RUN_LOCAL));
- if (runLocal)
- GenomixMiniCluster.init(conf);
- else
- startCC();
-
- String localInput = conf.get(GenomixJobConf.LOCAL_INPUT_DIR);
- if (localInput != null) {
- conf.set(GenomixJobConf.INITIAL_INPUT_DIR, conf.get(GenomixJobConf.HDFS_WORK_PATH) + File.separator
- + "00-initial-input-from-genomix-driver");
- DriverUtils.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
- }
- curOutput = conf.get(GenomixJobConf.INITIAL_INPUT_DIR);
-
- // currently, we just iterate over the jobs set in conf[PIPELINE_ORDER]. In the future, we may want more logic to iterate multiple times, etc
- String pipelineSteps = conf.get(GenomixJobConf.PIPELINE_ORDER);
- for (Patterns step : Patterns.arrayFromString(pipelineSteps)) {
- stepNum++;
- switch (step) {
- case BUILD:
- case BUILD_HYRACKS:
- setOutput(conf, Patterns.BUILD_HYRACKS);
- buildGraphWithHyracks(conf);
- break;
- case BUILD_HADOOP:
- setOutput(conf, Patterns.BUILD_HADOOP);
- buildGraphWithHadoop(conf);
- break;
- case MERGE_P1:
- setOutput(conf, Patterns.MERGE_P1);
- addJob(P1ForPathMergeVertex.getConfiguredJob(conf));
- break;
- case MERGE_P2:
- setOutput(conf, Patterns.MERGE_P2);
- addJob(P2ForPathMergeVertex.getConfiguredJob(conf));
- break;
- case MERGE:
- case MERGE_P4:
- setOutput(conf, Patterns.MERGE_P4);
- addJob(P4ForPathMergeVertex.getConfiguredJob(conf));
- break;
- case UNROLL_TANDEM:
- setOutput(conf, Patterns.UNROLL_TANDEM);
- addJob(UnrollTandemRepeat.getConfiguredJob(conf));
- break;
- case TIP_REMOVE:
- setOutput(conf, Patterns.TIP_REMOVE);
- addJob(TipRemoveVertex.getConfiguredJob(conf));
- break;
- case BUBBLE:
- setOutput(conf, Patterns.BUBBLE);
- addJob(BubbleMergeVertex.getConfiguredJob(conf));
- break;
- case LOW_COVERAGE:
- setOutput(conf, Patterns.LOW_COVERAGE);
- addJob(RemoveLowCoverageVertex.getConfiguredJob(conf));
- break;
- case BRIDGE:
- setOutput(conf, Patterns.BRIDGE);
- addJob(BridgeRemoveVertex.getConfiguredJob(conf));
- break;
- case SPLIT_REPEAT:
- setOutput(conf, Patterns.SPLIT_REPEAT);
- addJob(SplitRepeatVertex.getConfiguredJob(conf));
- break;
- case SCAFFOLD:
- setOutput(conf, Patterns.SCAFFOLD);
- addJob(ScaffoldingVertex.getConfiguredJob(conf));
- break;
- case STATS:
- DriverUtils.drawStatistics(conf, curOutput, "coverage.png");
- break;
- case DUMP_FASTA:
- dump = true;
- break;
+ try {
+ if (runLocal)
+ GenomixMiniCluster.init(conf);
+ else
+ startCC();
+
+ String localInput = conf.get(GenomixJobConf.LOCAL_INPUT_DIR);
+ if (localInput != null) {
+ conf.set(GenomixJobConf.INITIAL_INPUT_DIR, conf.get(GenomixJobConf.HDFS_WORK_PATH) + File.separator
+ + "00-initial-input-from-genomix-driver");
+ DriverUtils.copyLocalToHDFS(conf, localInput, conf.get(GenomixJobConf.INITIAL_INPUT_DIR));
}
- }
-
- if (jobs.size() > 0)
- startNCs(NCTypes.PREGELIX);
- // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
- // this would let them resume at arbitrary points of the pipeline
- if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
- for (int i = 0; i < jobs.size(); i++) {
- LOG.info("Starting job " + jobs.get(i).getJobName());
- GenomixJobConf.tick("pregelix-job");
-
- pregelixDriver.runJob(jobs.get(i), conf.get(GenomixJobConf.IP_ADDRESS),
+ curOutput = conf.get(GenomixJobConf.INITIAL_INPUT_DIR);
+
+ // currently, we just iterate over the jobs set in conf[PIPELINE_ORDER]. In the future, we may want more logic to iterate multiple times, etc
+ String pipelineSteps = conf.get(GenomixJobConf.PIPELINE_ORDER);
+ for (Patterns step : Patterns.arrayFromString(pipelineSteps)) {
+ stepNum++;
+ switch (step) {
+ case BUILD:
+ case BUILD_HYRACKS:
+ setOutput(conf, Patterns.BUILD_HYRACKS);
+ buildGraphWithHyracks(conf);
+ break;
+ case BUILD_HADOOP:
+ setOutput(conf, Patterns.BUILD_HADOOP);
+ buildGraphWithHadoop(conf);
+ break;
+ case MERGE_P1:
+ setOutput(conf, Patterns.MERGE_P1);
+ addJob(P1ForPathMergeVertex.getConfiguredJob(conf));
+ break;
+ case MERGE_P2:
+ setOutput(conf, Patterns.MERGE_P2);
+ addJob(P2ForPathMergeVertex.getConfiguredJob(conf));
+ break;
+ case MERGE:
+ case MERGE_P4:
+ setOutput(conf, Patterns.MERGE_P4);
+ addJob(P4ForPathMergeVertex.getConfiguredJob(conf));
+ break;
+ case UNROLL_TANDEM:
+ setOutput(conf, Patterns.UNROLL_TANDEM);
+ addJob(UnrollTandemRepeat.getConfiguredJob(conf));
+ break;
+ case TIP_REMOVE:
+ setOutput(conf, Patterns.TIP_REMOVE);
+ addJob(TipRemoveVertex.getConfiguredJob(conf));
+ break;
+ case BUBBLE:
+ setOutput(conf, Patterns.BUBBLE);
+ addJob(BubbleMergeVertex.getConfiguredJob(conf));
+ break;
+ case LOW_COVERAGE:
+ setOutput(conf, Patterns.LOW_COVERAGE);
+ addJob(RemoveLowCoverageVertex.getConfiguredJob(conf));
+ break;
+ case BRIDGE:
+ setOutput(conf, Patterns.BRIDGE);
+ addJob(BridgeRemoveVertex.getConfiguredJob(conf));
+ break;
+ case SPLIT_REPEAT:
+ setOutput(conf, Patterns.SPLIT_REPEAT);
+ addJob(SplitRepeatVertex.getConfiguredJob(conf));
+ break;
+ case SCAFFOLD:
+ setOutput(conf, Patterns.SCAFFOLD);
+ addJob(ScaffoldingVertex.getConfiguredJob(conf));
+ break;
+ case STATS:
+ DriverUtils.drawStatistics(conf, curOutput, "coverage.png");
+ break;
+ case DUMP_FASTA:
+ dump = true;
+ break;
+ }
+ }
+
+ if (jobs.size() > 0)
+ startNCs(NCTypes.PREGELIX);
+ // if the user wants to, we can save the intermediate results to HDFS (running each job individually)
+ // this would let them resume at arbitrary points of the pipeline
+ if (Boolean.parseBoolean(conf.get(GenomixJobConf.SAVE_INTERMEDIATE_RESULTS))) {
+ for (int i = 0; i < jobs.size(); i++) {
+ LOG.info("Starting job " + jobs.get(i).getJobName());
+ GenomixJobConf.tick("pregelix-job");
+
+ pregelixDriver.runJob(jobs.get(i), conf.get(GenomixJobConf.IP_ADDRESS),
+ Integer.parseInt(conf.get(GenomixJobConf.PORT)));
+
+ LOG.info("Finished job " + jobs.get(i).getJobName() + " in " + GenomixJobConf.tock("pregelix-job"));
+ }
+ } else {
+ LOG.info("Starting pregelix job series...");
+ GenomixJobConf.tick("pregelix-runJobs");
+ pregelixDriver.runJobs(jobs, conf.get(GenomixJobConf.IP_ADDRESS),
Integer.parseInt(conf.get(GenomixJobConf.PORT)));
-
- LOG.info("Finished job " + jobs.get(i).getJobName() + " in " + GenomixJobConf.tock("pregelix-job"));
+ LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
}
- } else {
- LOG.info("Starting pregelix job series...");
- GenomixJobConf.tick("pregelix-runJobs");
- pregelixDriver.runJobs(jobs, conf.get(GenomixJobConf.IP_ADDRESS),
- Integer.parseInt(conf.get(GenomixJobConf.PORT)));
- LOG.info("Finished job series in " + GenomixJobConf.tock("pregelix-runJobs"));
- }
-
- if (conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR) != null)
- DriverUtils.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
- if (dump)
- DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
- if (conf.get(GenomixJobConf.FINAL_OUTPUT_DIR) != null)
- FileSystem.get(conf).rename(new Path(curOutput), new Path(GenomixJobConf.FINAL_OUTPUT_DIR));
-
- if (runLocal)
- GenomixMiniCluster.deinit();
- else {
- shutdownNCs();
- shutdownCC();
+
+ if (conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR) != null)
+ DriverUtils.copyBinToLocal(conf, curOutput, conf.get(GenomixJobConf.LOCAL_OUTPUT_DIR));
+ if (dump)
+ DriverUtils.dumpGraph(conf, curOutput, "genome.fasta", followingBuild);
+ if (conf.get(GenomixJobConf.FINAL_OUTPUT_DIR) != null)
+ FileSystem.get(conf).rename(new Path(curOutput), new Path(GenomixJobConf.FINAL_OUTPUT_DIR));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ try {
+ System.out.println("Shutting down cluster");
+ if (runLocal)
+ GenomixMiniCluster.deinit();
+ else {
+ shutdownNCs();
+ shutdownCC();
+ }
+ } catch (Exception e) {
+ System.out.println("Exception raise while shutting down cluster:");
+ e.printStackTrace(System.err);
+ }
}
}