auto-discover CC port and IP settings (broke runLocal)
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
index e2a5343..34983c3 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/config/GenomixJobConf.java
@@ -110,7 +110,7 @@
@Option(name = "-coresPerMachine", usage="the number of cores available in each machine", required=false)
private int coresPerMachine = -1;
- @Option(name = "-runLocal", usage = "Run a local instance using the Hadoop MiniCluster. NOTE: overrides settings for -ip and -port", required=false)
+ @Option(name = "-runLocal", usage = "Run a local instance using the Hadoop MiniCluster. NOTE: overrides settings for -ip and -port and those in conf/*.properties", required=false)
private boolean runLocal = false;
@Argument
@@ -299,9 +299,9 @@
if (Integer.parseInt(conf.get(TIP_REMOVE_MAX_LENGTH)) < kmerLength)
throw new IllegalArgumentException("tipRemove_maxLength must be at least as long as kmerLength!");
- // Hyracks/Pregelix Advanced Setup
- if (conf.get(IP_ADDRESS) == null)
- throw new IllegalArgumentException("ipAddress was not specified!");
+// // Hyracks/Pregelix Advanced Setup
+// if (conf.get(IP_ADDRESS) == null)
+// throw new IllegalArgumentException("ipAddress was not specified!");
}
private void fillMissingDefaults() {
@@ -341,11 +341,11 @@
if (getInt(CORES_PER_MACHINE, -1) == -1)
setInt(CORES_PER_MACHINE, 4);
- if (getBoolean(RUN_LOCAL, false)) {
- // override any other settings for HOST and PORT
- set(IP_ADDRESS, PregelixHyracksIntegrationUtil.CC_HOST);
- setInt(PORT, PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
- }
+// if (getBoolean(RUN_LOCAL, false)) {
+// // override any other settings for HOST and PORT
+// set(IP_ADDRESS, PregelixHyracksIntegrationUtil.CC_HOST);
+// setInt(PORT, PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+// }
}
private void setFromOpts(Options opts) {
@@ -371,8 +371,10 @@
setBoolean(SAVE_INTERMEDIATE_RESULTS, opts.saveIntermediateResults);
- if (opts.runLocal && (opts.ipAddress != null || opts.port != -1))
- throw new IllegalArgumentException("Option -runLocal cannot be set at the same time as -port or -ip! (-runLocal starts a cluster; -ip and -port specify an existing cluster)");
+// if (opts.runLocal && (opts.ipAddress != null || opts.port != -1))
+// throw new IllegalArgumentException("Option -runLocal cannot be set at the same time as -port or -ip! (-runLocal starts a cluster; -ip and -port specify an existing cluster)");
+ if (opts.runLocal)
+ throw new IllegalArgumentException("runLocal is currently unsupported!");
setBoolean(RUN_LOCAL, opts.runLocal);
// Hyracks/Pregelix Setup
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
index 6de23b1..e448c0f 100644
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
+++ b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/DriverUtils.java
@@ -2,9 +2,12 @@
import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.Properties;
import java.util.TreeMap;
import java.util.Map.Entry;
@@ -30,9 +33,10 @@
import edu.uci.ics.genomix.config.GenomixJobConf;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class DriverUtils {
-
+
enum NCTypes {
HYRACKS,
PREGELIX
@@ -40,6 +44,35 @@
private static final Log LOG = LogFactory.getLog(DriverUtils.class);
+ /*
+ * Get the IP address of the master node using the bin/getip.sh script
+ */
+ static String getIP(String hostName) throws IOException, InterruptedException {
+ String getIPCmd = "ssh -n " + hostName + " \"" + System.getProperty("app.home", ".") + File.separator + "bin"
+ + File.separator + "getip.sh\"";
+ System.out.println(getIPCmd);
+ Process p = Runtime.getRuntime().exec(getIPCmd);
+ p.waitFor(); // wait for ssh
+ String stdout = IOUtils.toString(p.getInputStream()).trim();
+ if (p.exitValue() != 0)
+ throw new RuntimeException("Failed to get the ip address of the master node! Script returned exit code: "
+ + p.exitValue() + "\nstdout: " + stdout + "\nstderr: " + IOUtils.toString(p.getErrorStream()));
+ return stdout;
+ }
+
+ /**
+ * set the CC's IP address and port from the cluster.properties and `getip.sh` script
+ */
+ static void updateCCProperties(GenomixJobConf conf) throws FileNotFoundException, IOException, InterruptedException {
+ if (conf.get(GenomixJobConf.IP_ADDRESS) == null)
+ conf.set(GenomixJobConf.IP_ADDRESS, getIP("localhost"));
+ if (Integer.parseInt(conf.get(GenomixJobConf.PORT)) == -1) {
+ Properties CCProperties = new Properties();
+ CCProperties.load(new FileInputStream(System.getProperty("app.home", ".") + File.separator + "conf" + File.separator + "cluster.properties"));
+ conf.set(GenomixJobConf.PORT, CCProperties.getProperty("CC_CLIENTPORT"));
+ }
+ }
+
static void startNCs(NCTypes type) throws IOException {
LOG.info("Starting NC's");
String startNCCmd = System.getProperty("app.home", ".") + File.separator + "bin" + File.separator
@@ -54,7 +87,7 @@
if (p.exitValue() != 0)
throw new RuntimeException("Failed to start the" + type + " NC's! Script returned exit code: "
+ p.exitValue() + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
- + IOUtils.toString(p.getInputStream()));
+ + IOUtils.toString(p.getErrorStream()));
}
static void shutdownNCs() throws IOException {
@@ -64,7 +97,7 @@
Process p = Runtime.getRuntime().exec(stopNCCmd);
try {
p.waitFor(); // wait for ssh
-// Thread.sleep(3000);
+ // Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -84,7 +117,7 @@
if (p.exitValue() != 0)
throw new RuntimeException("Failed to start the genomix CC! Script returned exit code: " + p.exitValue()
+ "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
- + IOUtils.toString(p.getInputStream()));
+ + IOUtils.toString(p.getErrorStream()));
}
static void shutdownCC() throws IOException {
@@ -93,16 +126,16 @@
Process p = Runtime.getRuntime().exec(stopCCCmd);
try {
p.waitFor(); // wait for cmd execution
-// Thread.sleep(2000);
+ // Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
-// if (p.exitValue() != 0)
-// throw new RuntimeException("Failed to stop the genomix CC! Script returned exit code: " + p.exitValue()
-// + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
-// + IOUtils.toString(p.getInputStream()));
+ // if (p.exitValue() != 0)
+ // throw new RuntimeException("Failed to stop the genomix CC! Script returned exit code: " + p.exitValue()
+ // + "\nstdout: " + IOUtils.toString(p.getInputStream()) + "\nstderr: "
+ // + IOUtils.toString(p.getErrorStream()));
}
-
+
public static void copyLocalToHDFS(JobConf conf, String localDir, String destDir) throws IOException {
LOG.info("Copying local directory " + localDir + " to HDFS: " + destDir);
GenomixJobConf.tick("copyLocalToHDFS");
@@ -117,7 +150,7 @@
dfs.copyFromLocalFile(new Path(f.toString()), dest);
else
dfs.copyFromLocalFile(new Path(localDir), dest);
-
+
LOG.info("Copy took " + GenomixJobConf.tock("copyLocalToHDFS") + "ms");
}
@@ -168,12 +201,12 @@
LOG.info("Getting coverage statistics...");
GenomixJobConf.tick("drawStatistics");
FileSystem dfs = FileSystem.get(conf);
-
+
// stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
SequenceFile.Reader reader = null;
VKmerBytesWritable key = null;
NodeWritable value = null;
- TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
+ TreeMap<Integer, Long> coverageCounts = new TreeMap<Integer, Long>();
FileStatus[] files = dfs.globStatus(new Path(inputStats + File.separator + "*"));
for (FileStatus f : files) {
if (f.getLen() != 0) {
@@ -188,7 +221,7 @@
Long count = coverageCounts.get(cov);
if (count == null)
coverageCounts.put(cov, new Long(1));
- else
+ else
coverageCounts.put(cov, count + 1);
}
} catch (Exception e) {
@@ -199,7 +232,7 @@
}
}
}
-
+
XYSeries series = new XYSeries("Kmer Coverage");
for (Entry<Integer, Long> pair : coverageCounts.entrySet()) {
series.add(pair.getKey().floatValue(), pair.getValue().longValue());
@@ -215,13 +248,13 @@
chartOut.close();
LOG.info("Coverage took " + GenomixJobConf.tock("drawStatistics") + "ms");
}
-
-
- static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild) throws IOException {
+
+ static void dumpGraph(JobConf conf, String inputGraph, String outputFasta, boolean followingBuild)
+ throws IOException {
LOG.info("Dumping graph to fasta...");
GenomixJobConf.tick("dumpGraph");
FileSystem dfs = FileSystem.get(conf);
-
+
// stream in the graph, counting elements as you go... this would be better as a hadoop job which aggregated... maybe into counters?
SequenceFile.Reader reader = null;
VKmerBytesWritable key = null;
diff --git a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
index e134ca2..489ed05 100644
--- a/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
+++ b/genomix/genomix-driver/src/main/java/edu/uci/ics/genomix/driver/GenomixDriver.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -116,6 +117,7 @@
public void runGenomix(GenomixJobConf conf) throws NumberFormatException, HyracksException, Exception {
KmerBytesWritable.setGlobalKmerLength(Integer.parseInt(conf.get(GenomixJobConf.KMER_LENGTH)));
+ DriverUtils.updateCCProperties(conf);
jobs = new ArrayList<PregelixJob>();
stepNum = 0;
boolean dump = false;
@@ -248,7 +250,7 @@
}
public static void main(String[] args) throws CmdLineException, NumberFormatException, HyracksException, Exception {
- String[] myArgs = { "-ip", "169.234.116.43", "-port", "3099",
+ String[] myArgs = {
"-kmerLength", "5", "-coresPerMachine", "2",
// "-saveIntermediateResults", "true",
// "-localInput", "../genomix-pregelix/data/input/reads/synthetic/",