Merged hyracks_dev_next -r 1287 into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hadoop-compat/.classpath b/hyracks/hyracks-hadoop-compat/.classpath
deleted file mode 100644
index 1f3c1ff..0000000
--- a/hyracks/hyracks-hadoop-compat/.classpath
+++ /dev/null
@@ -1,7 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
- <classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
- <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
- <classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-hadoop-compat/.project b/hyracks/hyracks-hadoop-compat/.project
deleted file mode 100644
index 7d50383..0000000
--- a/hyracks/hyracks-hadoop-compat/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
- <name>hyracks-hadoop-compat</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.maven.ide.eclipse.maven2Builder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.eclipse.jdt.core.javanature</nature>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
- </natures>
-</projectDescription>
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index dfac000..0000000
--- a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-#Fri May 20 19:34:05 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
-org.eclipse.jdt.core.compiler.compliance=1.6
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index e03a9fc..0000000
--- a/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Tue Oct 19 11:05:30 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml
index 6a6d921..2de7e04 100644
--- a/hyracks/hyracks-hadoop-compat/pom.xml
+++ b/hyracks/hyracks-hadoop-compat/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-hadoop-compat</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
</parent>
<build>
@@ -79,7 +79,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-hadoop</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index a363221..eadf679 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -4,10 +4,10 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
-import java.util.UUID;
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
@@ -15,77 +15,73 @@
public class HyracksClient {
- private static HyracksRMIConnection connection;
- private static final String jobProfilingKey = "jobProfilingKey";
- Set<String> systemLibs;
+ private static HyracksConnection connection;
+ private static final String jobProfilingKey = "jobProfilingKey";
+ Set<String> systemLibs;
- public HyracksClient(Properties clusterProperties) throws Exception {
- initialize(clusterProperties);
- }
+ public HyracksClient(Properties clusterProperties) throws Exception {
+ initialize(clusterProperties);
+ }
- private void initialize(Properties properties) throws Exception {
- String clusterController = (String) properties
- .get(ConfigurationConstants.clusterControllerHost);
- connection = new HyracksRMIConnection(clusterController, 1099);
- systemLibs = new HashSet<String>();
- for (String systemLib : ConfigurationConstants.systemLibs) {
- String systemLibPath = properties.getProperty(systemLib);
- if (systemLibPath != null) {
- systemLibs.add(systemLibPath);
- }
- }
- }
+ private void initialize(Properties properties) throws Exception {
+ String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
+ connection = new HyracksConnection(clusterController, 1098);
+ systemLibs = new HashSet<String>();
+ for (String systemLib : ConfigurationConstants.systemLibs) {
+ String systemLibPath = properties.getProperty(systemLib);
+ if (systemLibPath != null) {
+ systemLibs.add(systemLibPath);
+ }
+ }
+ }
- public HyracksClient(String clusterConf, char delimiter) throws Exception {
- Properties properties = Utilities.getProperties(clusterConf, delimiter);
- initialize(properties);
- }
+ public HyracksClient(String clusterConf, char delimiter) throws Exception {
+ Properties properties = Utilities.getProperties(clusterConf, delimiter);
+ initialize(properties);
+ }
- private Set<String> getRequiredLibs(Set<String> userLibs) {
- Set<String> requiredLibs = new HashSet<String>();
- for (String systemLib : systemLibs) {
- requiredLibs.add(systemLib);
- }
- for (String userLib : userLibs) {
- requiredLibs.add(userLib);
- }
- return requiredLibs;
- }
+ private Set<String> getRequiredLibs(Set<String> userLibs) {
+ Set<String> requiredLibs = new HashSet<String>();
+ for (String systemLib : systemLibs) {
+ requiredLibs.add(systemLib);
+ }
+ for (String userLib : userLibs) {
+ requiredLibs.add(userLib);
+ }
+ return requiredLibs;
+ }
- public JobStatus getJobStatus(UUID jobId) throws Exception {
- return connection.getJobStatus(jobId);
- }
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ return connection.getJobStatus(jobId);
+ }
- private void createApplication(String applicationName, Set<String> userLibs)
- throws Exception {
- connection.createApplication(applicationName, Utilities
- .getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
- }
+ private void createApplication(String applicationName, Set<String> userLibs) throws Exception {
+ connection.createApplication(applicationName,
+ Utilities.getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
+ }
- public HyracksRunningJob submitJob(String applicationName,
- JobSpecification spec) throws Exception {
- String jobProfilingVal = System.getenv(jobProfilingKey);
- boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
- UUID jobId;
- if (doProfiling) {
- System.out.println("PROFILING");
- jobId = connection.createJob(applicationName, spec, EnumSet
- .of(JobFlag.PROFILE_RUNTIME));
- } else {
- jobId = connection.createJob(applicationName, spec);
- }
- connection.start(jobId);
- HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
- return runningJob;
- }
+ public HyracksRunningJob submitJob(String applicationName, JobSpecification spec) throws Exception {
+ String jobProfilingVal = System.getenv(jobProfilingKey);
+ boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
+ JobId jobId;
+ if (doProfiling) {
+ System.out.println("PROFILING");
+ jobId = connection.createJob(applicationName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ } else {
+ jobId = connection.createJob(applicationName, spec);
+ }
+ connection.start(jobId);
+ HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+ return runningJob;
+ }
- public HyracksRunningJob submitJob(String applicationName,
- JobSpecification spec, Set<String> userLibs) throws Exception {
- createApplication(applicationName, userLibs);
- return submitJob(applicationName, spec);
- }
+ public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> userLibs)
+ throws Exception {
+ createApplication(applicationName, userLibs);
+ return submitJob(applicationName, spec);
+ }
- public void waitForCompleton(UUID jobId) throws Exception {
- connection.waitForCompletion(jobId);
- }
+ public void waitForCompleton(JobId jobId) throws Exception {
+ connection.waitForCompletion(jobId);
+ }
}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
index 8470e12..7c650ec 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hyracks.hadoop.compat.client;
import java.io.IOException;
-import java.util.UUID;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobID;
@@ -9,19 +8,20 @@
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public class HyracksRunningJob implements RunningJob {
- UUID jobId;
+ JobId jobId;
JobSpecification spec;
HyracksClient hyracksClient;
- public UUID getJobId() {
+ public JobId getJobId() {
return jobId;
}
- public void setJobId(UUID jobId) {
+ public void setJobId(JobId jobId) {
this.jobId = jobId;
}
@@ -33,7 +33,7 @@
this.spec = spec;
}
- public HyracksRunningJob(UUID jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
+ public HyracksRunningJob(JobId jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
this.spec = jobSpec;
this.jobId = jobId;
this.hyracksClient = hyracksClient;
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index 37f4d34..bf09bb0 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -6,15 +6,16 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
-import java.util.UUID;
-import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.kohsuke.args4j.CmdLineParser;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
@@ -22,187 +23,168 @@
import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
public class CompatibilityLayer {
- HyracksClient hyracksClient;
- DCacheHandler dCacheHander = null;
- Properties clusterConf;
- HadoopAdapter hadoopAdapter;
+ HyracksClient hyracksClient;
+ DCacheHandler dCacheHander = null;
+ Properties clusterConf;
+ HadoopAdapter hadoopAdapter;
- private static char configurationFileDelimiter = '=';
- private static final String dacheKeyPrefix = "dcache.key";
+ private static char configurationFileDelimiter = '=';
+ private static final String dacheKeyPrefix = "dcache.key";
- public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
- initialize(clConfig);
- }
+ public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+ initialize(clConfig);
+ }
- private void initialize(CompatibilityConfig clConfig) throws Exception {
- clusterConf = Utilities.getProperties(clConfig.clusterConf,
- configurationFileDelimiter);
- hadoopAdapter = new HadoopAdapter(clusterConf
- .getProperty(ConfigurationConstants.namenodeURL));
- hyracksClient = new HyracksClient(clusterConf);
- dCacheHander = new DCacheHandler(clusterConf
- .getProperty(ConfigurationConstants.dcacheServerConfiguration));
- }
+ private void initialize(CompatibilityConfig clConfig) throws Exception {
+ clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
+ hadoopAdapter = new HadoopAdapter(clusterConf.getProperty(ConfigurationConstants.namenodeURL));
+ hyracksClient = new HyracksClient(clusterConf);
+ dCacheHander = new DCacheHandler(clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration));
+ }
- public HyracksRunningJob submitJob(JobConf conf,Set<String> userLibs) throws Exception {
- List<JobConf> jobConfs = new ArrayList<JobConf>();
- jobConfs.add(conf);
- String applicationName = conf.getJobName() + System.currentTimeMillis();
- JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
- HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
- applicationName, spec, userLibs);
- return hyracksRunningJob;
- }
-
- public HyracksRunningJob submitJobs(String applicationName,
- String[] jobFiles, Set<String> userLibs) throws Exception {
- List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
- populateDCache(jobFiles[0]);
- JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
- HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
- applicationName, spec, userLibs);
- return hyracksRunningJob;
- }
+ public HyracksRunningJob submitJob(JobConf conf, Set<String> userLibs) throws Exception {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ jobConfs.add(conf);
+ String applicationName = conf.getJobName() + System.currentTimeMillis();
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+ return hyracksRunningJob;
+ }
- public HyracksRunningJob submitJobs(String applicationName,
- String[] jobFiles) throws Exception {
- List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
- populateDCache(jobFiles[0]);
- JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
- HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
- applicationName, spec);
- return hyracksRunningJob;
- }
+ public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles, Set<String> userLibs)
+ throws Exception {
+ List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+ populateDCache(jobFiles[0]);
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+ return hyracksRunningJob;
+ }
- private void populateDCache(String jobFile) throws IOException {
- Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
- String tempDir = "/tmp";
- if (dcacheTasks.size() > 0) {
- for (String key : dcacheTasks.keySet()) {
- String destPath = tempDir + "/" + key
- + System.currentTimeMillis();
- hadoopAdapter.getHDFSClient().copyToLocalFile(
- new Path(dcacheTasks.get(key)), new Path(destPath));
- System.out.println(" source :" + dcacheTasks.get(key));
- System.out.println(" dest :" + destPath);
- System.out.println(" key :" + key);
- System.out.println(" value :" + destPath);
- dCacheHander.put(key, destPath);
- }
- }
- }
+ public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles) throws Exception {
+ List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+ populateDCache(jobFiles[0]);
+ JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+ HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec);
+ return hyracksRunningJob;
+ }
- private String getApplicationNameForHadoopJob(JobConf jobConf) {
- String jar = jobConf.getJar();
- if (jar != null) {
- return jar.substring(jar.lastIndexOf("/") >= 0 ? jar
- .lastIndexOf("/") + 1 : 0);
- } else {
- return "" + System.currentTimeMillis();
- }
- }
+ private void populateDCache(String jobFile) throws IOException {
+ Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
+ String tempDir = "/tmp";
+ if (dcacheTasks.size() > 0) {
+ for (String key : dcacheTasks.keySet()) {
+ String destPath = tempDir + "/" + key + System.currentTimeMillis();
+ hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
+ System.out.println(" source :" + dcacheTasks.get(key));
+ System.out.println(" dest :" + destPath);
+ System.out.println(" key :" + key);
+ System.out.println(" value :" + destPath);
+ dCacheHander.put(key, destPath);
+ }
+ }
+ }
- private Map<String, String> initializeCustomProperties(
- Properties properties, String prefix) {
- Map<String, String> foundProperties = new HashMap<String, String>();
- Set<Entry<Object, Object>> entrySet = properties.entrySet();
- for (Entry entry : entrySet) {
- String key = (String) entry.getKey();
- String value = (String) entry.getValue();
- if ((key.startsWith(prefix))) {
- String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
- foundProperties.put(actualKey, value);
- }
- }
- return foundProperties;
- }
+ private String getApplicationNameForHadoopJob(JobConf jobConf) {
+ String jar = jobConf.getJar();
+ if (jar != null) {
+ return jar.substring(jar.lastIndexOf("/") >= 0 ? jar.lastIndexOf("/") + 1 : 0);
+ } else {
+ return "" + System.currentTimeMillis();
+ }
+ }
- public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
- Properties jobProperties = Utilities.getProperties(jobFile, ',');
- Map<String, String> dcacheTasks = new HashMap<String, String>();
- Map<String, String> dcacheKeys = initializeCustomProperties(
- jobProperties, dacheKeyPrefix);
- for (String key : dcacheKeys.keySet()) {
- String sourcePath = dcacheKeys.get(key);
- if (sourcePath != null) {
- dcacheTasks.put(key, sourcePath);
- }
- }
- return dcacheTasks;
- }
+ private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
+ Map<String, String> foundProperties = new HashMap<String, String>();
+ Set<Entry<Object, Object>> entrySet = properties.entrySet();
+ for (Entry entry : entrySet) {
+ String key = (String) entry.getKey();
+ String value = (String) entry.getValue();
+ if ((key.startsWith(prefix))) {
+ String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+ foundProperties.put(actualKey, value);
+ }
+ }
+ return foundProperties;
+ }
- public void waitForCompletion(UUID jobId) throws Exception {
- hyracksClient.waitForCompleton(jobId);
- }
+ public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, ',');
+ Map<String, String> dcacheTasks = new HashMap<String, String>();
+ Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
+ for (String key : dcacheKeys.keySet()) {
+ String sourcePath = dcacheKeys.get(key);
+ if (sourcePath != null) {
+ dcacheTasks.put(key, sourcePath);
+ }
+ }
+ return dcacheTasks;
+ }
- private List<JobConf> constructHadoopJobConfs(String[] jobFiles)
- throws Exception {
- List<JobConf> jobConfs = new ArrayList<JobConf>();
- for (String jobFile : jobFiles) {
- jobConfs.add(constructHadoopJobConf(jobFile));
- }
- return jobConfs;
- }
+ public void waitForCompletion(JobId jobId) throws Exception {
+ hyracksClient.waitForCompleton(jobId);
+ }
- private JobConf constructHadoopJobConf(String jobFile) {
- Properties jobProperties = Utilities.getProperties(jobFile, '=');
- JobConf conf = new JobConf(hadoopAdapter.getConf());
- for (Entry entry : jobProperties.entrySet()) {
- conf.set((String) entry.getKey(), (String) entry.getValue());
- System.out.println((String) entry.getKey() + " : "
- + (String) entry.getValue());
- }
- return conf;
- }
+ private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ for (String jobFile : jobFiles) {
+ jobConfs.add(constructHadoopJobConf(jobFile));
+ }
+ return jobConfs;
+ }
- private String[] getJobs(CompatibilityConfig clConfig) {
- return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles
- .split(",");
- }
+ private JobConf constructHadoopJobConf(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, '=');
+ JobConf conf = new JobConf(hadoopAdapter.getConf());
+ for (Entry entry : jobProperties.entrySet()) {
+ conf.set((String) entry.getKey(), (String) entry.getValue());
+ System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
+ }
+ return conf;
+ }
- public static void main(String args[]) throws Exception {
- long startTime = System.nanoTime();
- CompatibilityConfig clConfig = new CompatibilityConfig();
- CmdLineParser cp = new CmdLineParser(clConfig);
- try {
- cp.parseArgument(args);
- } catch (Exception e) {
- System.err.println(e.getMessage());
- cp.printUsage(System.err);
- return;
- }
- CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
- String applicationName = clConfig.applicationName;
- String[] jobFiles = compatLayer.getJobs(clConfig);
- String[] userLibraries = null;
- if (clConfig.userLibs != null) {
- userLibraries = clConfig.userLibs.split(",");
- }
- try {
- HyracksRunningJob hyraxRunningJob = null;
- if (userLibraries != null) {
- Set<String> userLibs = new HashSet<String>();
- for (String userLib : userLibraries) {
- userLibs.add(userLib);
- }
- hyraxRunningJob = compatLayer.submitJobs(applicationName,
- jobFiles, userLibs);
- } else {
- hyraxRunningJob = compatLayer.submitJobs(applicationName,
- jobFiles);
- }
- compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
- long end_time = System.nanoTime();
- System.out.println("TOTAL TIME (from Launch to Completion):"
- + ((end_time - startTime) / (float) 1000000000.0)
- + " seconds.");
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
+ private String[] getJobs(CompatibilityConfig clConfig) {
+ return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
+ }
+
+ public static void main(String args[]) throws Exception {
+ long startTime = System.nanoTime();
+ CompatibilityConfig clConfig = new CompatibilityConfig();
+ CmdLineParser cp = new CmdLineParser(clConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+ CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+ String applicationName = clConfig.applicationName;
+ String[] jobFiles = compatLayer.getJobs(clConfig);
+ String[] userLibraries = null;
+ if (clConfig.userLibs != null) {
+ userLibraries = clConfig.userLibs.split(",");
+ }
+ try {
+ HyracksRunningJob hyraxRunningJob = null;
+ if (userLibraries != null) {
+ Set<String> userLibs = new HashSet<String>();
+ for (String userLib : userLibraries) {
+ userLibs.add(userLib);
+ }
+ hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles, userLibs);
+ } else {
+ hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles);
+ }
+ compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+ long end_time = System.nanoTime();
+ System.out.println("TOTAL TIME (from Launch to Completion):"
+ + ((end_time - startTime) / (float) 1000000000.0) + " seconds.");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index f2f7d03..28833d7 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -40,7 +40,7 @@
import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
@@ -370,13 +370,13 @@
return externalSortOp;
}
- public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
+ public static MToNPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
JobConf conf, JobSpecification spec) {
Class mapOutputKeyClass = conf.getMapOutputKeyClass();
Class mapOutputValueClass = conf.getMapOutputValueClass();
- MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
+ MToNPartitioningConnectorDescriptor connectorDescriptor = null;
ITuplePartitionComputerFactory factory = null;
conf.getMapOutputKeyClass();
if (conf.getPartitionerClass() != null
@@ -398,7 +398,7 @@
factory = new HadoopHashTuplePartitionComputerFactory(
mapOutputKeySerializerDerserializer);
}
- connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec,
+ connectorDescriptor = new MToNPartitioningConnectorDescriptor(spec,
factory);
return connectorDescriptor;
}