update startnc script and the driver
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1977 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7076f7f..5257bcc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -12,6 +12,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -33,188 +34,206 @@
@SuppressWarnings("rawtypes")
public class Driver implements IDriver {
- private static final Log LOG = LogFactory.getLog(Driver.class);
- private static String LIB = "lib";
- private JobGen jobGen;
- private PregelixJob job;
- private boolean profiling;
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private static String LIB = "lib";
+ private JobGen jobGen;
+ private PregelixJob job;
+ private boolean profiling;
- private String applicationName;
- private String GIRAPH_HOME = "GIRAPH_HOME";
- private IHyracksClientConnection hcc;
+ private String applicationName;
+ private String PREGELIX_HOME = "PREGELIX_HOME";
+ private IHyracksClientConnection hcc;
- private Class exampleClass;
+ private Class exampleClass;
- public Driver(Class exampleClass) {
- this.exampleClass = exampleClass;
- }
+ public Driver(Class exampleClass) {
+ this.exampleClass = exampleClass;
+ }
- @Override
- public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
- throws HyracksException {
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
+ @Override
+ public void runJob(PregelixJob job, Plan planChoice, String ipAddress,
+ int port, boolean profiling) throws HyracksException {
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
- this.job = job;
- this.profiling = profiling;
- try {
- switch (planChoice) {
- case INNER_JOIN:
- jobGen = new JobGenInnerJoin(job);
- break;
- case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(job);
- break;
- case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(job);
- break;
- case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(job);
- break;
- default:
- jobGen = new JobGenInnerJoin(job);
- }
+ this.job = job;
- if (hcc == null)
- hcc = new HyracksConnection(ipAddress, port);
- ClusterConfig.loadClusterConfig(ipAddress, port);
+ /** add hadoop configurations */
+ String hadoopPath = System.getProperty("HADOOP_HOME", "/hadoop");
+ Path pathCore = new Path(hadoopPath + "/conf/core-site.xml");
+ this.job.getConfiguration().addResource(pathCore);
+ Path pathMapRed = new Path(hadoopPath + "/conf/mapred-site.xml");
+ this.job.getConfiguration().addResource(pathMapRed);
+ Path pathHDFS = new Path(hadoopPath + "/conf/hdfs-site.xml");
+ this.job.getConfiguration().addResource(pathHDFS);
- URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
- URL[] urls = classLoader.getURLs();
- String jarFile = "";
- for (URL url : urls)
- if (url.toString().endsWith(".jar"))
- jarFile = url.getPath();
+ this.profiling = profiling;
+ try {
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(job);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(job);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(job);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(job);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(job);
+ }
- installApplication(jarFile);
- FileSystem dfs = FileSystem.get(job.getConfiguration());
- dfs.delete(FileOutputFormat.getOutputPath(job), true);
+ if (hcc == null)
+ hcc = new HyracksConnection(ipAddress, port);
+ ClusterConfig.loadClusterConfig(ipAddress, port);
- runCreate(jobGen);
- runDataLoad(jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("data loading finished " + time + "ms");
- int i = 1;
- boolean terminate = false;
- do {
- start = System.currentTimeMillis();
- runLoopBodyIteration(jobGen, i);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("iteration " + i + " finished " + time + "ms");
- terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
- i++;
- } while (!terminate);
+ URLClassLoader classLoader = (URLClassLoader) exampleClass
+ .getClassLoader();
+ URL[] urls = classLoader.getURLs();
+ String jarFile = "";
+ for (URL url : urls)
+ if (url.toString().endsWith(".jar"))
+ jarFile = url.getPath();
- start = System.currentTimeMillis();
- runHDFSWRite(jobGen);
- runCleanup(jobGen);
- destroyApplication(applicationName);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- LOG.info("job finished");
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
+ installApplication(jarFile);
+ FileSystem dfs = FileSystem.get(job.getConfiguration());
+ dfs.delete(FileOutputFormat.getOutputPath(job), true);
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
- execute(treeCreateSpec);
- } catch (Exception e) {
- throw e;
- }
- }
+ runCreate(jobGen);
+ runDataLoad(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("data loading finished " + time + "ms");
+ int i = 1;
+ boolean terminate = false;
+ do {
+ start = System.currentTimeMillis();
+ runLoopBodyIteration(jobGen, i);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("iteration " + i + " finished " + time + "ms");
+ terminate = IterationUtils.readTerminationState(
+ job.getConfiguration(), jobGen.getJobId());
+ i++;
+ } while (!terminate);
- private void runDataLoad(JobGen jobGen) throws Exception {
- try {
- JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
- execute(bulkLoadJobSpec);
- } catch (Exception e) {
- throw e;
- }
- }
+ start = System.currentTimeMillis();
+ runHDFSWRite(jobGen);
+ runCleanup(jobGen);
+ destroyApplication(applicationName);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
- private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
- try {
- JobSpecification loopBody = jobGen.generateJob(iteration);
- execute(loopBody);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
+ execute(treeCreateSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void runHDFSWRite(JobGen jobGen) throws Exception {
- try {
- JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
- execute(scanSortPrintJobSpec);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void runDataLoad(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+ execute(bulkLoadJobSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void runCleanup(JobGen jobGen) throws Exception {
- try {
- JobSpecification[] cleanups = jobGen.generateCleanup();
- runJobArray(cleanups);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void runLoopBodyIteration(JobGen jobGen, int iteration)
+ throws Exception {
+ try {
+ JobSpecification loopBody = jobGen.generateJob(iteration);
+ execute(loopBody);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void runJobArray(JobSpecification[] jobs) throws Exception {
- for (JobSpecification job : jobs) {
- execute(job);
- }
- }
+ private void runHDFSWRite(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification scanSortPrintJobSpec = jobGen
+ .scanIndexWriteGraph();
+ execute(scanSortPrintJobSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void execute(JobSpecification job) throws Exception {
- JobId jobId = hcc.startJob(applicationName, job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- }
+ private void runCleanup(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification[] cleanups = jobGen.generateCleanup();
+ runJobArray(cleanups);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- public void installApplication(String jarFile) throws Exception {
- System.out.println(jarFile);
- applicationName = job.getJobName() + new UUID(System.currentTimeMillis(), System.nanoTime());
- String home = System.getProperty(GIRAPH_HOME);
- if (home == null)
- home = "./";
- String libDir = home + LIB;
- File dir = new File(libDir);
- if (!dir.isDirectory()) {
- throw new HyracksException(libDir + " is not a directory!");
- }
- System.out.println(dir.getAbsolutePath());
- File[] libJars = dir.listFiles(new FileFilter("jar"));
- Set<String> allJars = new TreeSet<String>();
- allJars.add(jarFile);
- for (File jar : libJars) {
- allJars.add(jar.getAbsolutePath());
- }
- File appZip = Utilities.getHyracksArchive(applicationName, allJars);
- hcc.createApplication(applicationName, appZip);
- }
+ private void runJobArray(JobSpecification[] jobs) throws Exception {
+ for (JobSpecification job : jobs) {
+ execute(job);
+ }
+ }
- public void destroyApplication(String jarFile) throws Exception {
- hcc.destroyApplication(applicationName);
- }
+ private void execute(JobSpecification job) throws Exception {
+ JobId jobId = hcc.startJob(
+ applicationName,
+ job,
+ profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
+ .noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
+
+ public void installApplication(String jarFile) throws Exception {
+ System.out.println(jarFile);
+ applicationName = job.getJobName()
+ + new UUID(System.currentTimeMillis(), System.nanoTime());
+ String home = System.getProperty(PREGELIX_HOME);
+ if (home == null)
+ home = "./";
+ String libDir = home + LIB;
+ File dir = new File(libDir);
+ if (!dir.isDirectory()) {
+ throw new HyracksException(libDir + " is not a directory!");
+ }
+ System.out.println(dir.getAbsolutePath());
+ File[] libJars = dir.listFiles(new FileFilter("jar"));
+ Set<String> allJars = new TreeSet<String>();
+ allJars.add(jarFile);
+ for (File jar : libJars) {
+ allJars.add(jar.getAbsolutePath());
+ }
+ File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+ hcc.createApplication(applicationName, appZip);
+ }
+
+ public void destroyApplication(String jarFile) throws Exception {
+ hcc.destroyApplication(applicationName);
+ }
}
class FileFilter implements FilenameFilter {
- private String ext;
+ private String ext;
- public FileFilter(String ext) {
- this.ext = "." + ext;
- }
+ public FileFilter(String ext) {
+ this.ext = "." + ext;
+ }
- public boolean accept(File dir, String name) {
- return name.endsWith(ext);
- }
+ public boolean accept(File dir, String name) {
+ return name.endsWith(ext);
+ }
}
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index 6b00110..cb79692 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -50,5 +50,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-chmod -R 755 $HYRACKS_HOME
$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &