add hadoop config before adding paths

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1982 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 5257bcc..cb54227 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -12,7 +12,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -34,206 +33,188 @@
 
 @SuppressWarnings("rawtypes")
 public class Driver implements IDriver {
-	private static final Log LOG = LogFactory.getLog(Driver.class);
-	private static String LIB = "lib";
-	private JobGen jobGen;
-	private PregelixJob job;
-	private boolean profiling;
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private static String LIB = "lib";
+    private JobGen jobGen;
+    private PregelixJob job;
+    private boolean profiling;
 
-	private String applicationName;
-	private String PREGELIX_HOME = "PREGELIX_HOME";
-	private IHyracksClientConnection hcc;
+    private String applicationName;
+    private String PREGELIX_HOME = "PREGELIX_HOME";
+    private IHyracksClientConnection hcc;
 
-	private Class exampleClass;
+    private Class exampleClass;
 
-	public Driver(Class exampleClass) {
-		this.exampleClass = exampleClass;
-	}
+    public Driver(Class exampleClass) {
+        this.exampleClass = exampleClass;
+    }
 
-	@Override
-	public void runJob(PregelixJob job, Plan planChoice, String ipAddress,
-			int port, boolean profiling) throws HyracksException {
-		LOG.info("job started");
-		long start = System.currentTimeMillis();
-		long end = start;
-		long time = 0;
+    @Override
+    public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+            throws HyracksException {
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
 
-		this.job = job;
+        this.job = job;
+        this.profiling = profiling;
+        try {
+            switch (planChoice) {
+                case INNER_JOIN:
+                    jobGen = new JobGenInnerJoin(job);
+                    break;
+                case OUTER_JOIN:
+                    jobGen = new JobGenOuterJoin(job);
+                    break;
+                case OUTER_JOIN_SORT:
+                    jobGen = new JobGenOuterJoinSort(job);
+                    break;
+                case OUTER_JOIN_SINGLE_SORT:
+                    jobGen = new JobGenOuterJoinSingleSort(job);
+                    break;
+                default:
+                    jobGen = new JobGenInnerJoin(job);
+            }
 
-		/** add hadoop configurations */
-		String hadoopPath = System.getProperty("HADOOP_HOME", "/hadoop");
-		Path pathCore = new Path(hadoopPath + "/conf/core-site.xml");
-		this.job.getConfiguration().addResource(pathCore);
-		Path pathMapRed = new Path(hadoopPath + "/conf/mapred-site.xml");
-		this.job.getConfiguration().addResource(pathMapRed);
-		Path pathHDFS = new Path(hadoopPath + "/conf/hdfs-site.xml");
-		this.job.getConfiguration().addResource(pathHDFS);
+            if (hcc == null)
+                hcc = new HyracksConnection(ipAddress, port);
+            ClusterConfig.loadClusterConfig(ipAddress, port);
 
-		this.profiling = profiling;
-		try {
-			switch (planChoice) {
-			case INNER_JOIN:
-				jobGen = new JobGenInnerJoin(job);
-				break;
-			case OUTER_JOIN:
-				jobGen = new JobGenOuterJoin(job);
-				break;
-			case OUTER_JOIN_SORT:
-				jobGen = new JobGenOuterJoinSort(job);
-				break;
-			case OUTER_JOIN_SINGLE_SORT:
-				jobGen = new JobGenOuterJoinSingleSort(job);
-				break;
-			default:
-				jobGen = new JobGenInnerJoin(job);
-			}
+            URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+            URL[] urls = classLoader.getURLs();
+            String jarFile = "";
+            for (URL url : urls)
+                if (url.toString().endsWith(".jar"))
+                    jarFile = url.getPath();
 
-			if (hcc == null)
-				hcc = new HyracksConnection(ipAddress, port);
-			ClusterConfig.loadClusterConfig(ipAddress, port);
+            installApplication(jarFile);
+            FileSystem dfs = FileSystem.get(job.getConfiguration());
+            dfs.delete(FileOutputFormat.getOutputPath(job), true);
 
-			URLClassLoader classLoader = (URLClassLoader) exampleClass
-					.getClassLoader();
-			URL[] urls = classLoader.getURLs();
-			String jarFile = "";
-			for (URL url : urls)
-				if (url.toString().endsWith(".jar"))
-					jarFile = url.getPath();
+            runCreate(jobGen);
+            runDataLoad(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("data loading finished " + time + "ms");
+            int i = 1;
+            boolean terminate = false;
+            do {
+                start = System.currentTimeMillis();
+                runLoopBodyIteration(jobGen, i);
+                end = System.currentTimeMillis();
+                time = end - start;
+                LOG.info("iteration " + i + " finished " + time + "ms");
+                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+                i++;
+            } while (!terminate);
 
-			installApplication(jarFile);
-			FileSystem dfs = FileSystem.get(job.getConfiguration());
-			dfs.delete(FileOutputFormat.getOutputPath(job), true);
+            start = System.currentTimeMillis();
+            runHDFSWRite(jobGen);
+            runCleanup(jobGen);
+            destroyApplication(applicationName);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
 
-			runCreate(jobGen);
-			runDataLoad(jobGen);
-			end = System.currentTimeMillis();
-			time = end - start;
-			LOG.info("data loading finished " + time + "ms");
-			int i = 1;
-			boolean terminate = false;
-			do {
-				start = System.currentTimeMillis();
-				runLoopBodyIteration(jobGen, i);
-				end = System.currentTimeMillis();
-				time = end - start;
-				LOG.info("iteration " + i + " finished " + time + "ms");
-				terminate = IterationUtils.readTerminationState(
-						job.getConfiguration(), jobGen.getJobId());
-				i++;
-			} while (!terminate);
+    private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
+            execute(treeCreateSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
 
-			start = System.currentTimeMillis();
-			runHDFSWRite(jobGen);
-			runCleanup(jobGen);
-			destroyApplication(applicationName);
-			end = System.currentTimeMillis();
-			time = end - start;
-			LOG.info("result writing finished " + time + "ms");
-			LOG.info("job finished");
-		} catch (Exception e) {
-			throw new HyracksException(e);
-		}
-	}
+    private void runDataLoad(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+            execute(bulkLoadJobSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
 
-	private void runCreate(JobGen jobGen) throws Exception {
-		try {
-			JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
-			execute(treeCreateSpec);
-		} catch (Exception e) {
-			throw e;
-		}
-	}
+    private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+        try {
+            JobSpecification loopBody = jobGen.generateJob(iteration);
+            execute(loopBody);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
 
-	private void runDataLoad(JobGen jobGen) throws Exception {
-		try {
-			JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
-			execute(bulkLoadJobSpec);
-		} catch (Exception e) {
-			throw e;
-		}
-	}
+    private void runHDFSWRite(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
+            execute(scanSortPrintJobSpec);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
 
-	private void runLoopBodyIteration(JobGen jobGen, int iteration)
-			throws Exception {
-		try {
-			JobSpecification loopBody = jobGen.generateJob(iteration);
-			execute(loopBody);
-		} catch (Exception e) {
-			throw e;
-		}
-	}
+    private void runCleanup(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification[] cleanups = jobGen.generateCleanup();
+            runJobArray(cleanups);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
 
-	private void runHDFSWRite(JobGen jobGen) throws Exception {
-		try {
-			JobSpecification scanSortPrintJobSpec = jobGen
-					.scanIndexWriteGraph();
-			execute(scanSortPrintJobSpec);
-		} catch (Exception e) {
-			throw e;
-		}
-	}
+    private void runJobArray(JobSpecification[] jobs) throws Exception {
+        for (JobSpecification job : jobs) {
+            execute(job);
+        }
+    }
 
-	private void runCleanup(JobGen jobGen) throws Exception {
-		try {
-			JobSpecification[] cleanups = jobGen.generateCleanup();
-			runJobArray(cleanups);
-		} catch (Exception e) {
-			throw e;
-		}
-	}
+    private void execute(JobSpecification job) throws Exception {
+        JobId jobId = hcc.startJob(applicationName, job,
+                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
 
-	private void runJobArray(JobSpecification[] jobs) throws Exception {
-		for (JobSpecification job : jobs) {
-			execute(job);
-		}
-	}
+    public void installApplication(String jarFile) throws Exception {
+        System.out.println(jarFile);
+        applicationName = job.getJobName() + new UUID(System.currentTimeMillis(), System.nanoTime());
+        String home = System.getProperty(PREGELIX_HOME);
+        if (home == null)
+            home = "./";
+        String libDir = home + LIB;
+        File dir = new File(libDir);
+        if (!dir.isDirectory()) {
+            throw new HyracksException(libDir + " is not a directory!");
+        }
+        System.out.println(dir.getAbsolutePath());
+        File[] libJars = dir.listFiles(new FileFilter("jar"));
+        Set<String> allJars = new TreeSet<String>();
+        allJars.add(jarFile);
+        for (File jar : libJars) {
+            allJars.add(jar.getAbsolutePath());
+        }
+        File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+        hcc.createApplication(applicationName, appZip);
+    }
 
-	private void execute(JobSpecification job) throws Exception {
-		JobId jobId = hcc.startJob(
-				applicationName,
-				job,
-				profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
-						.noneOf(JobFlag.class));
-		hcc.waitForCompletion(jobId);
-	}
-
-	public void installApplication(String jarFile) throws Exception {
-		System.out.println(jarFile);
-		applicationName = job.getJobName()
-				+ new UUID(System.currentTimeMillis(), System.nanoTime());
-		String home = System.getProperty(PREGELIX_HOME);
-		if (home == null)
-			home = "./";
-		String libDir = home + LIB;
-		File dir = new File(libDir);
-		if (!dir.isDirectory()) {
-			throw new HyracksException(libDir + " is not a directory!");
-		}
-		System.out.println(dir.getAbsolutePath());
-		File[] libJars = dir.listFiles(new FileFilter("jar"));
-		Set<String> allJars = new TreeSet<String>();
-		allJars.add(jarFile);
-		for (File jar : libJars) {
-			allJars.add(jar.getAbsolutePath());
-		}
-		File appZip = Utilities.getHyracksArchive(applicationName, allJars);
-		hcc.createApplication(applicationName, appZip);
-	}
-
-	public void destroyApplication(String jarFile) throws Exception {
-		hcc.destroyApplication(applicationName);
-	}
+    public void destroyApplication(String jarFile) throws Exception {
+        hcc.destroyApplication(applicationName);
+    }
 
 }
 
 class FileFilter implements FilenameFilter {
-	private String ext;
+    private String ext;
 
-	public FileFilter(String ext) {
-		this.ext = "." + ext;
-	}
+    public FileFilter(String ext) {
+        this.ext = "." + ext;
+    }
 
-	public boolean accept(File dir, String name) {
-		return name.endsWith(ext);
-	}
+    public boolean accept(File dir, String name) {
+        return name.endsWith(ext);
+    }
 }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index 57e2c71..f575f6d 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -17,60 +17,74 @@
 
 public class Client {
 
-    private static class Options {
-        @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
-        public String inputPaths;
+	private static class Options {
+		@Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
+		public String inputPaths;
 
-        @Option(name = "-outputpath", usage = "output path", required = true)
-        public String outputPath;
+		@Option(name = "-outputpath", usage = "output path", required = true)
+		public String outputPath;
 
-        @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
-        public String ipAddress;
+		@Option(name = "-ip", usage = "ip address of cluster controller", required = true)
+		public String ipAddress;
 
-        @Option(name = "-port", usage = "port of cluster controller", required = true)
-        public int port;
+		@Option(name = "-port", usage = "port of cluster controller", required = true)
+		public int port;
 
-        @Option(name = "-plan", usage = "query plan choice", required = true)
-        public Plan planChoice;
+		@Option(name = "-plan", usage = "query plan choice", required = true)
+		public Plan planChoice;
 
-        @Option(name = "-vnum", usage = "number of vertices", required = false)
-        public long numVertices;
+		@Option(name = "-vnum", usage = "number of vertices", required = false)
+		public long numVertices;
 
-        @Option(name = "-enum", usage = "number of vertices", required = false)
-        public long numEdges;
+		@Option(name = "-enum", usage = "number of vertices", required = false)
+		public long numEdges;
 
-        @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths only", required = false)
-        public long sourceId;
+		@Option(name = "-source-vertex", usage = "source vertex id, for shortest paths only", required = false)
+		public long sourceId;
 
-        @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
-        public long numIteration = -1;
+		@Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
+		public long numIteration = -1;
 
-        @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
-        public String profiling = "false";
-    }
+		@Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
+		public String profiling = "false";
+	}
 
-    public static void run(String[] args, PregelixJob job) throws Exception {
-        Options options = prepareJob(args, job);
-        Driver driver = new Driver(Client.class);
-        driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
-    }
+	public static void run(String[] args, PregelixJob job) throws Exception {
+		Options options = prepareJob(args, job);
+		Driver driver = new Driver(Client.class);
+		driver.runJob(job, options.planChoice, options.ipAddress, options.port,
+				Boolean.parseBoolean(options.profiling));
+	}
 
-    private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        parser.parseArgument(args);
+	private static Options prepareJob(String[] args, PregelixJob job)
+			throws CmdLineException, IOException {
+		Options options = new Options();
+		CmdLineParser parser = new CmdLineParser(options);
+		parser.parseArgument(args);
 
-        String[] inputs = options.inputPaths.split(";");
-        FileInputFormat.setInputPaths(job, inputs[0]);
-        for (int i = 1; i < inputs.length; i++)
-            FileInputFormat.addInputPaths(job, inputs[0]);
-        FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
-        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
-        job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
-        job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
-        if (options.numIteration > 0)
-            job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
-        return options;
-    }
+		/** add hadoop configurations */
+		String hadoopPath = System.getProperty("HADOOP_HOME", "/hadoop");
+		Path pathCore = new Path(hadoopPath + "/conf/core-site.xml");
+		job.getConfiguration().addResource(pathCore);
+		Path pathMapRed = new Path(hadoopPath + "/conf/mapred-site.xml");
+		job.getConfiguration().addResource(pathMapRed);
+		Path pathHDFS = new Path(hadoopPath + "/conf/hdfs-site.xml");
+		job.getConfiguration().addResource(pathHDFS);
+
+		String[] inputs = options.inputPaths.split(";");
+		FileInputFormat.setInputPaths(job, inputs[0]);
+		for (int i = 1; i < inputs.length; i++)
+			FileInputFormat.addInputPaths(job, inputs[0]);
+		FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+		job.getConfiguration().setLong(PregelixJob.NUM_VERTICE,
+				options.numVertices);
+		job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
+		job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID,
+				options.sourceId);
+		if (options.numIteration > 0)
+			job.getConfiguration().setLong(PageRankVertex.ITERATIONS,
+					options.numIteration);
+		return options;
+	}
 
 }