update startnc script and the driver

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1977 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7076f7f..5257bcc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -12,6 +12,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -33,188 +34,206 @@
 
 @SuppressWarnings("rawtypes")
 public class Driver implements IDriver {
-    private static final Log LOG = LogFactory.getLog(Driver.class);
-    private static String LIB = "lib";
-    private JobGen jobGen;
-    private PregelixJob job;
-    private boolean profiling;
+	private static final Log LOG = LogFactory.getLog(Driver.class);
+	private static String LIB = "lib";
+	private JobGen jobGen;
+	private PregelixJob job;
+	private boolean profiling;
 
-    private String applicationName;
-    private String GIRAPH_HOME = "GIRAPH_HOME";
-    private IHyracksClientConnection hcc;
+	private String applicationName;
+	private String PREGELIX_HOME = "PREGELIX_HOME";
+	private IHyracksClientConnection hcc;
 
-    private Class exampleClass;
+	private Class exampleClass;
 
-    public Driver(Class exampleClass) {
-        this.exampleClass = exampleClass;
-    }
+	public Driver(Class exampleClass) {
+		this.exampleClass = exampleClass;
+	}
 
-    @Override
-    public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
-            throws HyracksException {
-        LOG.info("job started");
-        long start = System.currentTimeMillis();
-        long end = start;
-        long time = 0;
+	@Override
+	public void runJob(PregelixJob job, Plan planChoice, String ipAddress,
+			int port, boolean profiling) throws HyracksException {
+		LOG.info("job started");
+		long start = System.currentTimeMillis();
+		long end = start;
+		long time = 0;
 
-        this.job = job;
-        this.profiling = profiling;
-        try {
-            switch (planChoice) {
-                case INNER_JOIN:
-                    jobGen = new JobGenInnerJoin(job);
-                    break;
-                case OUTER_JOIN:
-                    jobGen = new JobGenOuterJoin(job);
-                    break;
-                case OUTER_JOIN_SORT:
-                    jobGen = new JobGenOuterJoinSort(job);
-                    break;
-                case OUTER_JOIN_SINGLE_SORT:
-                    jobGen = new JobGenOuterJoinSingleSort(job);
-                    break;
-                default:
-                    jobGen = new JobGenInnerJoin(job);
-            }
+		this.job = job;
 
-            if (hcc == null)
-                hcc = new HyracksConnection(ipAddress, port);
-            ClusterConfig.loadClusterConfig(ipAddress, port);
+		/** add hadoop configurations */
+		String hadoopPath = System.getProperty("HADOOP_HOME", "/hadoop");
+		Path pathCore = new Path(hadoopPath + "/conf/core-site.xml");
+		this.job.getConfiguration().addResource(pathCore);
+		Path pathMapRed = new Path(hadoopPath + "/conf/mapred-site.xml");
+		this.job.getConfiguration().addResource(pathMapRed);
+		Path pathHDFS = new Path(hadoopPath + "/conf/hdfs-site.xml");
+		this.job.getConfiguration().addResource(pathHDFS);
 
-            URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
-            URL[] urls = classLoader.getURLs();
-            String jarFile = "";
-            for (URL url : urls)
-                if (url.toString().endsWith(".jar"))
-                    jarFile = url.getPath();
+		this.profiling = profiling;
+		try {
+			switch (planChoice) {
+			case INNER_JOIN:
+				jobGen = new JobGenInnerJoin(job);
+				break;
+			case OUTER_JOIN:
+				jobGen = new JobGenOuterJoin(job);
+				break;
+			case OUTER_JOIN_SORT:
+				jobGen = new JobGenOuterJoinSort(job);
+				break;
+			case OUTER_JOIN_SINGLE_SORT:
+				jobGen = new JobGenOuterJoinSingleSort(job);
+				break;
+			default:
+				jobGen = new JobGenInnerJoin(job);
+			}
 
-            installApplication(jarFile);
-            FileSystem dfs = FileSystem.get(job.getConfiguration());
-            dfs.delete(FileOutputFormat.getOutputPath(job), true);
+			if (hcc == null)
+				hcc = new HyracksConnection(ipAddress, port);
+			ClusterConfig.loadClusterConfig(ipAddress, port);
 
-            runCreate(jobGen);
-            runDataLoad(jobGen);
-            end = System.currentTimeMillis();
-            time = end - start;
-            LOG.info("data loading finished " + time + "ms");
-            int i = 1;
-            boolean terminate = false;
-            do {
-                start = System.currentTimeMillis();
-                runLoopBodyIteration(jobGen, i);
-                end = System.currentTimeMillis();
-                time = end - start;
-                LOG.info("iteration " + i + " finished " + time + "ms");
-                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
-                i++;
-            } while (!terminate);
+			URLClassLoader classLoader = (URLClassLoader) exampleClass
+					.getClassLoader();
+			URL[] urls = classLoader.getURLs();
+			String jarFile = "";
+			for (URL url : urls)
+				if (url.toString().endsWith(".jar"))
+					jarFile = url.getPath();
 
-            start = System.currentTimeMillis();
-            runHDFSWRite(jobGen);
-            runCleanup(jobGen);
-            destroyApplication(applicationName);
-            end = System.currentTimeMillis();
-            time = end - start;
-            LOG.info("result writing finished " + time + "ms");
-            LOG.info("job finished");
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
+			installApplication(jarFile);
+			FileSystem dfs = FileSystem.get(job.getConfiguration());
+			dfs.delete(FileOutputFormat.getOutputPath(job), true);
 
-    private void runCreate(JobGen jobGen) throws Exception {
-        try {
-            JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
-            execute(treeCreateSpec);
-        } catch (Exception e) {
-            throw e;
-        }
-    }
+			runCreate(jobGen);
+			runDataLoad(jobGen);
+			end = System.currentTimeMillis();
+			time = end - start;
+			LOG.info("data loading finished " + time + "ms");
+			int i = 1;
+			boolean terminate = false;
+			do {
+				start = System.currentTimeMillis();
+				runLoopBodyIteration(jobGen, i);
+				end = System.currentTimeMillis();
+				time = end - start;
+				LOG.info("iteration " + i + " finished " + time + "ms");
+				terminate = IterationUtils.readTerminationState(
+						job.getConfiguration(), jobGen.getJobId());
+				i++;
+			} while (!terminate);
 
-    private void runDataLoad(JobGen jobGen) throws Exception {
-        try {
-            JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
-            execute(bulkLoadJobSpec);
-        } catch (Exception e) {
-            throw e;
-        }
-    }
+			start = System.currentTimeMillis();
+			runHDFSWRite(jobGen);
+			runCleanup(jobGen);
+			destroyApplication(applicationName);
+			end = System.currentTimeMillis();
+			time = end - start;
+			LOG.info("result writing finished " + time + "ms");
+			LOG.info("job finished");
+		} catch (Exception e) {
+			throw new HyracksException(e);
+		}
+	}
 
-    private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
-        try {
-            JobSpecification loopBody = jobGen.generateJob(iteration);
-            execute(loopBody);
-        } catch (Exception e) {
-            throw e;
-        }
-    }
+	private void runCreate(JobGen jobGen) throws Exception {
+		try {
+			JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
+			execute(treeCreateSpec);
+		} catch (Exception e) {
+			throw e;
+		}
+	}
 
-    private void runHDFSWRite(JobGen jobGen) throws Exception {
-        try {
-            JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
-            execute(scanSortPrintJobSpec);
-        } catch (Exception e) {
-            throw e;
-        }
-    }
+	private void runDataLoad(JobGen jobGen) throws Exception {
+		try {
+			JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+			execute(bulkLoadJobSpec);
+		} catch (Exception e) {
+			throw e;
+		}
+	}
 
-    private void runCleanup(JobGen jobGen) throws Exception {
-        try {
-            JobSpecification[] cleanups = jobGen.generateCleanup();
-            runJobArray(cleanups);
-        } catch (Exception e) {
-            throw e;
-        }
-    }
+	private void runLoopBodyIteration(JobGen jobGen, int iteration)
+			throws Exception {
+		try {
+			JobSpecification loopBody = jobGen.generateJob(iteration);
+			execute(loopBody);
+		} catch (Exception e) {
+			throw e;
+		}
+	}
 
-    private void runJobArray(JobSpecification[] jobs) throws Exception {
-        for (JobSpecification job : jobs) {
-            execute(job);
-        }
-    }
+	private void runHDFSWRite(JobGen jobGen) throws Exception {
+		try {
+			JobSpecification scanSortPrintJobSpec = jobGen
+					.scanIndexWriteGraph();
+			execute(scanSortPrintJobSpec);
+		} catch (Exception e) {
+			throw e;
+		}
+	}
 
-    private void execute(JobSpecification job) throws Exception {
-        JobId jobId = hcc.startJob(applicationName, job,
-                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
-    }
+	private void runCleanup(JobGen jobGen) throws Exception {
+		try {
+			JobSpecification[] cleanups = jobGen.generateCleanup();
+			runJobArray(cleanups);
+		} catch (Exception e) {
+			throw e;
+		}
+	}
 
-    public void installApplication(String jarFile) throws Exception {
-        System.out.println(jarFile);
-        applicationName = job.getJobName() + new UUID(System.currentTimeMillis(), System.nanoTime());
-        String home = System.getProperty(GIRAPH_HOME);
-        if (home == null)
-            home = "./";
-        String libDir = home + LIB;
-        File dir = new File(libDir);
-        if (!dir.isDirectory()) {
-            throw new HyracksException(libDir + " is not a directory!");
-        }
-        System.out.println(dir.getAbsolutePath());
-        File[] libJars = dir.listFiles(new FileFilter("jar"));
-        Set<String> allJars = new TreeSet<String>();
-        allJars.add(jarFile);
-        for (File jar : libJars) {
-            allJars.add(jar.getAbsolutePath());
-        }
-        File appZip = Utilities.getHyracksArchive(applicationName, allJars);
-        hcc.createApplication(applicationName, appZip);
-    }
+	private void runJobArray(JobSpecification[] jobs) throws Exception {
+		for (JobSpecification job : jobs) {
+			execute(job);
+		}
+	}
 
-    public void destroyApplication(String jarFile) throws Exception {
-        hcc.destroyApplication(applicationName);
-    }
+	private void execute(JobSpecification job) throws Exception {
+		JobId jobId = hcc.startJob(
+				applicationName,
+				job,
+				profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
+						.noneOf(JobFlag.class));
+		hcc.waitForCompletion(jobId);
+	}
+
+	public void installApplication(String jarFile) throws Exception {
+		System.out.println(jarFile);
+		applicationName = job.getJobName()
+				+ new UUID(System.currentTimeMillis(), System.nanoTime());
+		String home = System.getProperty(PREGELIX_HOME);
+		if (home == null)
+			home = "./";
+		String libDir = home + LIB;
+		File dir = new File(libDir);
+		if (!dir.isDirectory()) {
+			throw new HyracksException(libDir + " is not a directory!");
+		}
+		System.out.println(dir.getAbsolutePath());
+		File[] libJars = dir.listFiles(new FileFilter("jar"));
+		Set<String> allJars = new TreeSet<String>();
+		allJars.add(jarFile);
+		for (File jar : libJars) {
+			allJars.add(jar.getAbsolutePath());
+		}
+		File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+		hcc.createApplication(applicationName, appZip);
+	}
+
+	public void destroyApplication(String jarFile) throws Exception {
+		hcc.destroyApplication(applicationName);
+	}
 
 }
 
 class FileFilter implements FilenameFilter {
-    private String ext;
+	private String ext;
 
-    public FileFilter(String ext) {
-        this.ext = "." + ext;
-    }
+	public FileFilter(String ext) {
+		this.ext = "." + ext;
+	}
 
-    public boolean accept(File dir, String name) {
-        return name.endsWith(ext);
-    }
+	public boolean accept(File dir, String name) {
+		return name.endsWith(ext);
+	}
 }
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index 6b00110..cb79692 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -50,5 +50,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-chmod -R 755 $HYRACKS_HOME
 $HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &