add hadoop config before adding paths
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1982 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 5257bcc..cb54227 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -12,7 +12,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -34,206 +33,188 @@
@SuppressWarnings("rawtypes")
public class Driver implements IDriver {
- private static final Log LOG = LogFactory.getLog(Driver.class);
- private static String LIB = "lib";
- private JobGen jobGen;
- private PregelixJob job;
- private boolean profiling;
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private static String LIB = "lib";
+ private JobGen jobGen;
+ private PregelixJob job;
+ private boolean profiling;
- private String applicationName;
- private String PREGELIX_HOME = "PREGELIX_HOME";
- private IHyracksClientConnection hcc;
+ private String applicationName;
+ private String PREGELIX_HOME = "PREGELIX_HOME";
+ private IHyracksClientConnection hcc;
- private Class exampleClass;
+ private Class exampleClass;
- public Driver(Class exampleClass) {
- this.exampleClass = exampleClass;
- }
+ public Driver(Class exampleClass) {
+ this.exampleClass = exampleClass;
+ }
- @Override
- public void runJob(PregelixJob job, Plan planChoice, String ipAddress,
- int port, boolean profiling) throws HyracksException {
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
+ @Override
+ public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+ throws HyracksException {
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
- this.job = job;
+ this.job = job;
+ this.profiling = profiling;
+ try {
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(job);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(job);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(job);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(job);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(job);
+ }
- /** add hadoop configurations */
- String hadoopPath = System.getProperty("HADOOP_HOME", "/hadoop");
- Path pathCore = new Path(hadoopPath + "/conf/core-site.xml");
- this.job.getConfiguration().addResource(pathCore);
- Path pathMapRed = new Path(hadoopPath + "/conf/mapred-site.xml");
- this.job.getConfiguration().addResource(pathMapRed);
- Path pathHDFS = new Path(hadoopPath + "/conf/hdfs-site.xml");
- this.job.getConfiguration().addResource(pathHDFS);
+ if (hcc == null)
+ hcc = new HyracksConnection(ipAddress, port);
+ ClusterConfig.loadClusterConfig(ipAddress, port);
- this.profiling = profiling;
- try {
- switch (planChoice) {
- case INNER_JOIN:
- jobGen = new JobGenInnerJoin(job);
- break;
- case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(job);
- break;
- case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(job);
- break;
- case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(job);
- break;
- default:
- jobGen = new JobGenInnerJoin(job);
- }
+ URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+ URL[] urls = classLoader.getURLs();
+ String jarFile = "";
+ for (URL url : urls)
+ if (url.toString().endsWith(".jar"))
+ jarFile = url.getPath();
- if (hcc == null)
- hcc = new HyracksConnection(ipAddress, port);
- ClusterConfig.loadClusterConfig(ipAddress, port);
+ installApplication(jarFile);
+ FileSystem dfs = FileSystem.get(job.getConfiguration());
+ dfs.delete(FileOutputFormat.getOutputPath(job), true);
- URLClassLoader classLoader = (URLClassLoader) exampleClass
- .getClassLoader();
- URL[] urls = classLoader.getURLs();
- String jarFile = "";
- for (URL url : urls)
- if (url.toString().endsWith(".jar"))
- jarFile = url.getPath();
+ runCreate(jobGen);
+ runDataLoad(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("data loading finished " + time + "ms");
+ int i = 1;
+ boolean terminate = false;
+ do {
+ start = System.currentTimeMillis();
+ runLoopBodyIteration(jobGen, i);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("iteration " + i + " finished " + time + "ms");
+ terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+ i++;
+ } while (!terminate);
- installApplication(jarFile);
- FileSystem dfs = FileSystem.get(job.getConfiguration());
- dfs.delete(FileOutputFormat.getOutputPath(job), true);
+ start = System.currentTimeMillis();
+ runHDFSWRite(jobGen);
+ runCleanup(jobGen);
+ destroyApplication(applicationName);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
- runCreate(jobGen);
- runDataLoad(jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("data loading finished " + time + "ms");
- int i = 1;
- boolean terminate = false;
- do {
- start = System.currentTimeMillis();
- runLoopBodyIteration(jobGen, i);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("iteration " + i + " finished " + time + "ms");
- terminate = IterationUtils.readTerminationState(
- job.getConfiguration(), jobGen.getJobId());
- i++;
- } while (!terminate);
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
+ execute(treeCreateSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- start = System.currentTimeMillis();
- runHDFSWRite(jobGen);
- runCleanup(jobGen);
- destroyApplication(applicationName);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- LOG.info("job finished");
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
+ private void runDataLoad(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+ execute(bulkLoadJobSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
- execute(treeCreateSpec);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+ try {
+ JobSpecification loopBody = jobGen.generateJob(iteration);
+ execute(loopBody);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void runDataLoad(JobGen jobGen) throws Exception {
- try {
- JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
- execute(bulkLoadJobSpec);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void runHDFSWRite(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
+ execute(scanSortPrintJobSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void runLoopBodyIteration(JobGen jobGen, int iteration)
- throws Exception {
- try {
- JobSpecification loopBody = jobGen.generateJob(iteration);
- execute(loopBody);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void runCleanup(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification[] cleanups = jobGen.generateCleanup();
+ runJobArray(cleanups);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
- private void runHDFSWRite(JobGen jobGen) throws Exception {
- try {
- JobSpecification scanSortPrintJobSpec = jobGen
- .scanIndexWriteGraph();
- execute(scanSortPrintJobSpec);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void runJobArray(JobSpecification[] jobs) throws Exception {
+ for (JobSpecification job : jobs) {
+ execute(job);
+ }
+ }
- private void runCleanup(JobGen jobGen) throws Exception {
- try {
- JobSpecification[] cleanups = jobGen.generateCleanup();
- runJobArray(cleanups);
- } catch (Exception e) {
- throw e;
- }
- }
+ private void execute(JobSpecification job) throws Exception {
+ JobId jobId = hcc.startJob(applicationName, job,
+ profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
- private void runJobArray(JobSpecification[] jobs) throws Exception {
- for (JobSpecification job : jobs) {
- execute(job);
- }
- }
+ public void installApplication(String jarFile) throws Exception {
+ System.out.println(jarFile);
+ applicationName = job.getJobName() + new UUID(System.currentTimeMillis(), System.nanoTime());
+ String home = System.getProperty(PREGELIX_HOME);
+ if (home == null)
+ home = "./";
+ String libDir = home + LIB;
+ File dir = new File(libDir);
+ if (!dir.isDirectory()) {
+ throw new HyracksException(libDir + " is not a directory!");
+ }
+ System.out.println(dir.getAbsolutePath());
+ File[] libJars = dir.listFiles(new FileFilter("jar"));
+ Set<String> allJars = new TreeSet<String>();
+ allJars.add(jarFile);
+ for (File jar : libJars) {
+ allJars.add(jar.getAbsolutePath());
+ }
+ File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+ hcc.createApplication(applicationName, appZip);
+ }
- private void execute(JobSpecification job) throws Exception {
- JobId jobId = hcc.startJob(
- applicationName,
- job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
- .noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- }
-
- public void installApplication(String jarFile) throws Exception {
- System.out.println(jarFile);
- applicationName = job.getJobName()
- + new UUID(System.currentTimeMillis(), System.nanoTime());
- String home = System.getProperty(PREGELIX_HOME);
- if (home == null)
- home = "./";
- String libDir = home + LIB;
- File dir = new File(libDir);
- if (!dir.isDirectory()) {
- throw new HyracksException(libDir + " is not a directory!");
- }
- System.out.println(dir.getAbsolutePath());
- File[] libJars = dir.listFiles(new FileFilter("jar"));
- Set<String> allJars = new TreeSet<String>();
- allJars.add(jarFile);
- for (File jar : libJars) {
- allJars.add(jar.getAbsolutePath());
- }
- File appZip = Utilities.getHyracksArchive(applicationName, allJars);
- hcc.createApplication(applicationName, appZip);
- }
-
- public void destroyApplication(String jarFile) throws Exception {
- hcc.destroyApplication(applicationName);
- }
+ public void destroyApplication(String jarFile) throws Exception {
+ hcc.destroyApplication(applicationName);
+ }
}
class FileFilter implements FilenameFilter {
- private String ext;
+ private String ext;
- public FileFilter(String ext) {
- this.ext = "." + ext;
- }
+ public FileFilter(String ext) {
+ this.ext = "." + ext;
+ }
- public boolean accept(File dir, String name) {
- return name.endsWith(ext);
- }
+ public boolean accept(File dir, String name) {
+ return name.endsWith(ext);
+ }
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 57e2c71..f575f6d 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -17,60 +17,74 @@
public class Client {
- private static class Options {
- @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
- public String inputPaths;
+ private static class Options {
+ @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
+ public String inputPaths;
- @Option(name = "-outputpath", usage = "output path", required = true)
- public String outputPath;
+ @Option(name = "-outputpath", usage = "output path", required = true)
+ public String outputPath;
- @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
- public String ipAddress;
+ @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
+ public String ipAddress;
- @Option(name = "-port", usage = "port of cluster controller", required = true)
- public int port;
+ @Option(name = "-port", usage = "port of cluster controller", required = true)
+ public int port;
- @Option(name = "-plan", usage = "query plan choice", required = true)
- public Plan planChoice;
+ @Option(name = "-plan", usage = "query plan choice", required = true)
+ public Plan planChoice;
- @Option(name = "-vnum", usage = "number of vertices", required = false)
- public long numVertices;
+ @Option(name = "-vnum", usage = "number of vertices", required = false)
+ public long numVertices;
- @Option(name = "-enum", usage = "number of vertices", required = false)
- public long numEdges;
+ @Option(name = "-enum", usage = "number of vertices", required = false)
+ public long numEdges;
- @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths only", required = false)
- public long sourceId;
+ @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths only", required = false)
+ public long sourceId;
- @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
- public long numIteration = -1;
+ @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
+ public long numIteration = -1;
- @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
- public String profiling = "false";
- }
+ @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
+ public String profiling = "false";
+ }
- public static void run(String[] args, PregelixJob job) throws Exception {
- Options options = prepareJob(args, job);
- Driver driver = new Driver(Client.class);
- driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
- }
+ public static void run(String[] args, PregelixJob job) throws Exception {
+ Options options = prepareJob(args, job);
+ Driver driver = new Driver(Client.class);
+ driver.runJob(job, options.planChoice, options.ipAddress, options.port,
+ Boolean.parseBoolean(options.profiling));
+ }
- private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
+ private static Options prepareJob(String[] args, PregelixJob job)
+ throws CmdLineException, IOException {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
- String[] inputs = options.inputPaths.split(";");
- FileInputFormat.setInputPaths(job, inputs[0]);
- for (int i = 1; i < inputs.length; i++)
- FileInputFormat.addInputPaths(job, inputs[0]);
- FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
- job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
- job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
- if (options.numIteration > 0)
- job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
- return options;
- }
+ /** add hadoop configurations */
+ String hadoopPath = System.getProperty("HADOOP_HOME", "/hadoop");
+ Path pathCore = new Path(hadoopPath + "/conf/core-site.xml");
+ job.getConfiguration().addResource(pathCore);
+ Path pathMapRed = new Path(hadoopPath + "/conf/mapred-site.xml");
+ job.getConfiguration().addResource(pathMapRed);
+ Path pathHDFS = new Path(hadoopPath + "/conf/hdfs-site.xml");
+ job.getConfiguration().addResource(pathHDFS);
+
+ String[] inputs = options.inputPaths.split(";");
+ FileInputFormat.setInputPaths(job, inputs[0]);
+ for (int i = 1; i < inputs.length; i++)
+ FileInputFormat.addInputPaths(job, inputs[0]);
+ FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE,
+ options.numVertices);
+ job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
+ job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID,
+ options.sourceId);
+ if (options.numIteration > 0)
+ job.getConfiguration().setLong(PageRankVertex.ITERATIONS,
+ options.numIteration);
+ return options;
+ }
}