Checked in Hadoop Compatibility layer
git-svn-id: https://hyracks.googlecode.com/svn/trunk@150 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hadoop-compat/.classpath b/hyracks/hyracks-hadoop-compat/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" output="target/classes" path="src/main/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks/hyracks-hadoop-compat/.project b/hyracks/hyracks-hadoop-compat/.project
new file mode 100644
index 0000000..7d50383
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>hyracks-hadoop-compat</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
+</projectDescription>
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..fd50bc4
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Tue Oct 19 11:05:30 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..e03a9fc
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Tue Oct 19 11:05:30 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml
new file mode 100644
index 0000000..996eb37
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/pom.xml
@@ -0,0 +1,87 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hadoop-compat</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.hadoop.compat.CompatibilityLayer</mainClass>
+ <name>hadoop-compat</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.dcache</groupId>
+ <artifactId>dcache-client</artifactId>
+ <version>0.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
new file mode 100644
index 0000000..45c65a1
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.client;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+
+public class HyracksClient {
+
+ private HadoopAdapter hadoopAdapter;
+ private static HyracksRMIConnection connection;
+ private static final String applicationName = "CompatibilityLayer";
+
+ public HyracksClient(String clusterConf) throws Exception {
+ Properties properties = Utilities.getProperties(clusterConf, '=');
+ String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
+ String fileSystem = (String) properties.get(ConfigurationConstants.namenodeURL);
+ initialize(clusterController, fileSystem);
+ }
+
+ public HyracksClient(String clusterControllerAddr, String fileSystem) throws Exception {
+ initialize(clusterControllerAddr, fileSystem);
+ }
+
+ private void initialize(String clusterControllerAddr, String namenodeUrl) throws Exception {
+ connection = new HyracksRMIConnection(clusterControllerAddr, 1099);
+ connection.destroyApplication(applicationName);
+ hadoopAdapter = new HadoopAdapter(namenodeUrl);
+ }
+
+ public HyracksRunningJob submitJobs(List<JobConf> confs, String[] requiredLibs) throws Exception {
+ JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
+ return submitJob(spec, requiredLibs);
+ }
+
+ public HyracksRunningJob submitJob(JobConf conf, String[] requiredLibs) throws Exception {
+ JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
+ return submitJob(spec, requiredLibs);
+ }
+
+ public JobStatus getJobStatus(UUID jobId) throws Exception {
+ return connection.getJobStatus(jobId);
+ }
+
+ public HyracksRunningJob submitJob(JobSpecification spec, String[] requiredLibs) throws Exception {
+ String applicationName = "" + spec.toString().hashCode();
+ connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
+ System.out.println(" created application :" + applicationName);
+ UUID jobId = connection.createJob(applicationName, spec);
+ connection.start(jobId);
+ HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+ return runningJob;
+ }
+
+ public HadoopAdapter getHadoopAdapter() {
+ return hadoopAdapter;
+ }
+
+ public void setHadoopAdapter(HadoopAdapter hadoopAdapter) {
+ this.hadoopAdapter = hadoopAdapter;
+ }
+
+ public void waitForCompleton(UUID jobId) throws Exception {
+ connection.waitForCompletion(jobId);
+ }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
new file mode 100644
index 0000000..79a9031
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.client;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class HyracksRunningJob implements RunningJob {
+
+ UUID jobId;
+ JobSpecification spec;
+ HyracksClient hyracksClient;
+
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(UUID jobId) {
+ this.jobId = jobId;
+ }
+
+ public JobSpecification getSpec() {
+ return spec;
+ }
+
+ public void setSpec(JobSpecification spec) {
+ this.spec = spec;
+ }
+
+ public HyracksRunningJob(UUID jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
+ this.spec = jobSpec;
+ this.jobId = jobId;
+ this.hyracksClient = hyracksClient;
+ }
+
+ @Override
+ public float cleanupProgress() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public Counters getCounters() throws IOException {
+ return new Counters();
+ }
+
+ @Override
+ public JobID getID() {
+ return new JobID(this.jobId.toString(), 1);
+ }
+
+ @Override
+ public String getJobFile() {
+ return "";
+ }
+
+ @Override
+ public String getJobID() {
+ return this.jobId.toString();
+ }
+
+ @Override
+ public String getJobName() {
+ return this.jobId.toString();
+ }
+
+ @Override
+ public int getJobState() throws IOException {
+ return isComplete() ? 2 : 1;
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) throws IOException {
+ return new TaskCompletionEvent[0];
+ }
+
+ @Override
+ public String getTrackingURL() {
+ return " running on hyrax, remote kill is not supported ";
+ }
+
+ @Override
+ public boolean isComplete() throws IOException {
+ edu.uci.ics.hyracks.api.job.JobStatus status = null;
+ try {
+ status = hyracksClient.getJobStatus(jobId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return status.equals(edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED);
+ }
+
+ @Override
+ public boolean isSuccessful() throws IOException {
+ return isComplete();
+ }
+
+ @Override
+ public void killJob() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void killTask(String taskId, boolean shouldFail) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public float mapProgress() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public float reduceProgress() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void setJobPriority(String priority) throws IOException {
+
+ }
+
+ @Override
+ public float setupProgress() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void waitForCompletion() throws IOException {
+ while (!isComplete()) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void killTask(TaskAttemptID arg0, boolean arg1) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
new file mode 100644
index 0000000..bbf0a8b
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.driver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
+import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
+import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+
+public class CompatibilityLayer {
+
+ HyracksClient hyracksClient;
+ DCacheHandler dCacheHander = null;
+ Properties clusterConf;
+ String[] systemLibs;
+
+ private static char configurationFileDelimiter = '=';
+ private static final String dacheKeyPrefix = "dcache.key";
+
+ public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+ initialize(clConfig);
+ }
+
+ public HyracksRunningJob submitJobs(String[] jobFiles, String[] userLibs) throws Exception {
+ String[] requiredLibs = getRequiredLibs(userLibs);
+ List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+ Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFiles[0]);
+ String tempDir = "/tmp";
+ if (dcacheTasks.size() > 0) {
+ HadoopAdapter hadoopAdapter = hyracksClient.getHadoopAdapter();
+ for (String key : dcacheTasks.keySet()) {
+ String destPath = tempDir + "/" + key + System.currentTimeMillis();
+ hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
+ System.out.println(" source :" + dcacheTasks.get(key));
+ System.out.println(" dest :" + destPath);
+ System.out.println(" key :" + key);
+ System.out.println(" value :" + destPath);
+ dCacheHander.put(key, destPath);
+ }
+ }
+ HyracksRunningJob hyraxRunningJob = hyracksClient.submitJobs(jobConfs, requiredLibs);
+ return hyraxRunningJob;
+ }
+
+ private String[] getRequiredLibs(String[] userLibs) {
+ List<String> requiredLibs = new ArrayList<String>();
+ for (String systemLib : systemLibs) {
+ requiredLibs.add(systemLib);
+ }
+ for (String userLib : userLibs) {
+ requiredLibs.add(userLib);
+ }
+ return requiredLibs.toArray(new String[] {});
+ }
+
+ private void initialize(CompatibilityConfig clConfig) throws Exception {
+ clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
+ List<String> systemLibs = new ArrayList<String>();
+ for (String systemLib : ConfigurationConstants.systemLibs) {
+ String systemLibPath = clusterConf.getProperty(systemLib);
+ if (systemLibPath != null) {
+ systemLibs.add(systemLibPath);
+ }
+ }
+ this.systemLibs = systemLibs.toArray(new String[] {});
+ String clusterControllerHost = clusterConf.getProperty(ConfigurationConstants.clusterControllerHost);
+ String dacheServerConfiguration = clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration);
+ String fileSystem = clusterConf.getProperty(ConfigurationConstants.namenodeURL);
+ hyracksClient = new HyracksClient(clusterControllerHost, fileSystem);
+ try {
+ dCacheHander = new DCacheHandler(dacheServerConfiguration);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
+ Map<String, String> foundProperties = new HashMap<String, String>();
+ Set<Entry<Object, Object>> entrySet = properties.entrySet();
+ for (Entry entry : entrySet) {
+ String key = (String) entry.getKey();
+ String value = (String) entry.getValue();
+ if ((key.startsWith(prefix))) {
+ String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+ foundProperties.put(actualKey, value);
+ }
+ }
+ return foundProperties;
+ }
+
+ public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, ',');
+ Map<String, String> dcacheTasks = new HashMap<String, String>();
+ Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
+ for (String key : dcacheKeys.keySet()) {
+ String sourcePath = dcacheKeys.get(key);
+ if (sourcePath != null) {
+ dcacheTasks.put(key, sourcePath);
+ }
+ }
+ return dcacheTasks;
+ }
+
+ public void waitForCompletion(UUID jobId) throws Exception {
+ hyracksClient.waitForCompleton(jobId);
+ }
+
+ public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, String[] userLibs) {
+ HyracksRunningJob hyraxRunningJob = null;
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ jobConfs.add(jobConf);
+ try {
+ hyraxRunningJob = hyracksClient.submitJobs(jobConfs, getRequiredLibs(userLibs));
+ System.out.println(" Result in " + jobConf.get("mapred.output.dir"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return hyraxRunningJob;
+ }
+
+ public HyracksRunningJob submitJob(JobSpecification jobSpec, String[] userLibs) {
+ HyracksRunningJob hyraxRunningJob = null;
+ try {
+ hyraxRunningJob = hyracksClient.submitJob(jobSpec, getRequiredLibs(userLibs));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return hyraxRunningJob;
+ }
+
+ private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ for (String jobFile : jobFiles) {
+ jobConfs.add(constructHadoopJobConf(jobFile));
+ }
+ return jobConfs;
+ }
+
+ private JobConf constructHadoopJobConf(String jobFile) {
+ Properties jobProperties = Utilities.getProperties(jobFile, '=');
+ JobConf conf = hyracksClient.getHadoopAdapter().getConf();
+ for (Entry entry : jobProperties.entrySet()) {
+ conf.set((String) entry.getKey(), (String) entry.getValue());
+ System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
+ }
+ return conf;
+ }
+
+ private String[] getJobs(CompatibilityConfig clConfig) {
+ return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
+ }
+
+ public static void main(String args[]) throws Exception {
+ long startTime = System.nanoTime();
+ CompatibilityConfig clConfig = new CompatibilityConfig();
+ CmdLineParser cp = new CmdLineParser(clConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+ CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+ String[] jobFiles = compatLayer.getJobs(clConfig);
+ String[] userLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
+ HyracksRunningJob hyraxRunningJob = null;
+ try {
+ hyraxRunningJob = compatLayer.submitJobs(jobFiles, userLibs);
+ compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ hyraxRunningJob.waitForCompletion();
+ long end_time = System.nanoTime();
+ System.out.println("TOTAL TIME (from Launch to Completion):" + ((end_time - startTime) / (float) 1000000000.0)
+ + " seconds.");
+ }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
new file mode 100644
index 0000000..cf5b2df
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import org.kohsuke.args4j.Option;
+
+public class CompatibilityConfig {
+
+ @Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
+ + " (1) Address of HyracksClusterController service" + " (2) Address of Hadoop namenode service")
+ public String clusterConf;
+
+ @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
+ + "Each job file defines the hadoop job + " + "The order in the list defines the sequence in which"
+ + "the jobs are to be executed")
+ public String jobFiles;
+
+ @Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running "
+ + " mappers/reducers etc ")
+ public String userLibs;
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
new file mode 100644
index 0000000..049191c
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+public class ConfigurationConstants {
+
+ public static final String clusterControllerHost = "clusterControllerHost";
+ public static final String namenodeURL = "fs.default.name";
+ public static final String dcacheServerConfiguration = "dcacheServerConfiguration";
+ public static final String[] systemLibs = new String[] { "hyracksDataflowStdLib", "hyracksDataflowCommonLib",
+ "hyracksDataflowHadoopLib", "hadoopCoreLib" };
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
new file mode 100644
index 0000000..5b77548
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.IOException;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+
+public class DCacheHandler {
+
+ String clientPropertyFile;
+ String key;
+ String valueFilePath;
+
+ public enum Operation {
+ GET,
+ PUT,
+ DELETE
+ }
+
+ private static DCacheClient dCacheClient;
+ private static final String[] operations = { "GET", "PUT", "DELETE" };
+
+ public DCacheHandler(String clientPropertyFile) throws Exception {
+ this.clientPropertyFile = clientPropertyFile;
+ init();
+ }
+
+ private void init() throws Exception {
+ dCacheClient = DCacheClient.get();
+ DCacheClientConfig dcacheClientConfig = new DCacheClientConfig();
+ dCacheClient.init(dcacheClientConfig);
+ }
+
+ public static DCacheClient getInstance(String clientPropertyFile) {
+ if (dCacheClient == null) {
+ dCacheClient = DCacheClient.get();
+ }
+ return dCacheClient;
+ }
+
+ public String getClientPropertyFile() {
+ return clientPropertyFile;
+ }
+
+ public void put(String key, String value) throws IOException {
+ dCacheClient.set(key, value);
+ System.out.println(" added to cache " + key + " : " + value);
+ }
+
+ public Object get(String key) throws IOException {
+ return dCacheClient.get(key);
+ }
+
+ public void delete(String key) throws IOException {
+ dCacheClient.delete(key);
+ }
+
+ public Object performOperation(String operation, String[] args) throws Exception {
+ Object returnValue = null;
+ int operationIndex = getOperation(operation);
+ switch (operationIndex) {
+ case 0:
+ returnValue = dCacheClient.get(args[2]);
+ System.out.println(" get from cache " + returnValue);
+ break;
+ case 1:
+ dCacheClient.set(args[2], args[3]);
+ System.out.println(" added to cache " + args[2] + " : " + args[3]);
+ break;
+ case 2:
+ dCacheClient.delete(args[2]);
+ System.out.println(" removed from cache " + args[2]);
+ break;
+ default:
+ System.out.println("Error : Operation not supported !");
+ break;
+ }
+ return returnValue;
+ }
+
+ private int getOperation(String operation) {
+ for (int i = 0; i < operations.length; i++) {
+ if (operations[i].equalsIgnoreCase(operation)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
new file mode 100644
index 0000000..f4fcd83
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReadOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+
+public class HadoopAdapter {
+
+ public static final String FS_DEFAULT_NAME = "fs.default.name";
+ private JobConf jobConf;
+
+ public HadoopAdapter(String namenodeUrl) {
+ jobConf = new JobConf(true);
+ jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+ }
+
+ public JobConf getConf() {
+ return jobConf;
+ }
+
+ public static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf)
+ throws IOException {
+ VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress,
+ jobConf);
+ return versionedProtocol;
+ }
+
+ private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
+ RecordDescriptor recordDescriptor = null;
+ try {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(className1),
+ (Class<? extends Writable>) Class.forName(className2));
+ } catch (ClassNotFoundException cnfe) {
+ cnfe.printStackTrace();
+ }
+ return recordDescriptor;
+ }
+
+ private InputSplit[] getInputSplits(JobConf jobConf) throws IOException {
+ InputFormat inputFormat = jobConf.getInputFormat();
+ return inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
+ }
+
+ public HadoopMapperOperatorDescriptor getMapper(JobConf conf, InputSplit[] splits, JobSpecification spec)
+ throws IOException {
+ HadoopMapperOperatorDescriptor mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
+ new ClasspathBasedHadoopClassFactory());
+ return mapOp;
+ }
+
+ public HadoopReducerOperatorDescriptor getReducer(JobConf conf, JobSpecification spec) {
+ HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(spec, conf, null,
+ new ClasspathBasedHadoopClassFactory());
+ return reduceOp;
+ }
+
+ public FileSystem getHDFSClient() {
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = FileSystem.get(jobConf);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ return fileSystem;
+ }
+
+ public JobSpecification getJobSpecification(List<JobConf> jobConfs) throws Exception {
+ JobSpecification spec = null;
+ if (jobConfs.size() == 1) {
+ spec = getJobSpecification(jobConfs.get(0));
+ } else {
+ spec = getPipelinedSpec(jobConfs);
+ }
+ return spec;
+ }
+
+ private IOperatorDescriptor configureOutput(IOperatorDescriptor previousOperator, JobConf conf,
+ JobSpecification spec) throws Exception {
+ PartitionConstraint previousOpConstraint = previousOperator.getPartitionConstraint();
+ int noOfInputs = previousOpConstraint instanceof PartitionCountConstraint ? ((PartitionCountConstraint) previousOpConstraint)
+ .getCount() : ((ExplicitPartitionConstraint) previousOpConstraint).getLocationConstraints().length;
+ int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfInputs;
+ HadoopWriteOperatorDescriptor writer = null;
+ writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
+ writer.setPartitionConstraint(previousOperator.getPartitionConstraint());
+ spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
+ return writer;
+ }
+
+ private IOperatorDescriptor addMRToExistingPipeline(IOperatorDescriptor previousOperator, JobConf jobConf,
+ JobSpecification spec, InputSplit[] splits) throws IOException {
+ HadoopMapperOperatorDescriptor mapOp = getMapper(jobConf, splits, spec);
+ IOperatorDescriptor mrOutputOperator = mapOp;
+ PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
+ mapOp.setPartitionConstraint(mapperPartitionConstraint);
+ spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapOp, 0);
+ IOperatorDescriptor mapOutputOperator = mapOp;
+
+ boolean useCombiner = (jobConf.getCombinerClass() != null);
+ if (useCombiner) {
+ System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
+ InMemorySortOperatorDescriptor mapSideCombineSortOp = getSorter(jobConf, spec);
+ mapSideCombineSortOp.setPartitionConstraint(mapperPartitionConstraint);
+
+ HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
+ mapSideCombineReduceOp.setPartitionConstraint(mapperPartitionConstraint);
+ mapOutputOperator = mapSideCombineReduceOp;
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), mapOp, 0, mapSideCombineSortOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
+ mrOutputOperator = mapSideCombineReduceOp;
+ }
+
+ if (jobConf.getNumReduceTasks() != 0) {
+ IOperatorDescriptor sorter = getSorter(jobConf, spec);
+ HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
+
+ PartitionConstraint reducerPartitionConstraint = new PartitionCountConstraint(jobConf.getNumReduceTasks());
+ sorter.setPartitionConstraint(reducerPartitionConstraint);
+ reducer.setPartitionConstraint(reducerPartitionConstraint);
+
+ IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
+ spec.connect(mToNConnectorDescriptor, mapOutputOperator, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
+ mrOutputOperator = reducer;
+ }
+ return mrOutputOperator;
+ }
+
+ public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
+ JobSpecification spec = new JobSpecification();
+ Iterator<JobConf> iterator = jobConfs.iterator();
+ JobConf firstMR = iterator.next();
+ InputSplit[] splits = getInputSplits(firstMR);
+ IOperatorDescriptor reader = new HadoopReadOperatorDescriptor(firstMR, spec, splits);
+ IOperatorDescriptor outputOperator = reader;
+ outputOperator = addMRToExistingPipeline(reader, firstMR, spec, splits);
+ while (iterator.hasNext())
+ for (JobConf currentJobConf : jobConfs) {
+ outputOperator = addMRToExistingPipeline(outputOperator, currentJobConf, spec, null);
+ }
+ configureOutput(outputOperator, jobConfs.get(jobConfs.size() - 1), spec);
+ return spec;
+ }
+
+ public JobSpecification getJobSpecification(JobConf conf) throws Exception {
+ JobSpecification spec = new JobSpecification();
+ InputSplit[] splits = getInputSplits(conf);
+ HadoopReadOperatorDescriptor hadoopReadOperatorDescriptor = new HadoopReadOperatorDescriptor(conf, spec, splits);
+ IOperatorDescriptor mrOutputOperator = addMRToExistingPipeline(hadoopReadOperatorDescriptor, conf, spec, splits);
+ IOperatorDescriptor printer = configureOutput(mrOutputOperator, conf, spec);
+ spec.addRoot(printer);
+ return spec;
+ }
+
+ public static InMemorySortOperatorDescriptor getSorter(JobConf conf, JobSpecification spec) {
+ InMemorySortOperatorDescriptor inMemorySortOp = null;
+ RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
+ .getMapOutputValueClass().getName());
+ Class<? extends RawComparator> rawComparatorClass = null;
+ WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
+ WritableComparable.class));
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+ writableComparator.getClass());
+ inMemorySortOp = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
+ new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
+ return inMemorySortOp;
+ }
+
+ public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
+ JobSpecification spec) {
+
+ Class mapOutputKeyClass = conf.getMapOutputKeyClass();
+ Class mapOutputValueClass = conf.getMapOutputValueClass();
+
+ MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
+ ITuplePartitionComputerFactory factory = null;
+ conf.getMapOutputKeyClass();
+ if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
+ Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
+ factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner,
+ DatatypeHelper.createSerializerDeserializer(mapOutputKeyClass),
+ DatatypeHelper.createSerializerDeserializer(mapOutputValueClass));
+ } else {
+ RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
+ mapOutputValueClass);
+ ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
+ .createSerializerDeserializer(mapOutputKeyClass);
+ factory = new HadoopHashTuplePartitionComputerFactory(mapOutputKeySerializerDerserializer);
+ }
+ connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec, factory);
+ return connectorDescriptor;
+ }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
new file mode 100644
index 0000000..d6bd652
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Properties;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+public class Utilities {
+
+ public static Properties getProperties(String filePath, char delimiter) {
+ Properties properties = new Properties();
+ try {
+ FileInputStream fins = new FileInputStream(new File(filePath));
+ DataInputStream dins = new DataInputStream(fins);
+ BufferedReader br = new BufferedReader(new InputStreamReader(dins));
+ String strLine;
+ while ((strLine = br.readLine()) != null) {
+ int split = strLine.indexOf(delimiter);
+ if (split >= 0) {
+ properties.put((strLine.substring(0, split)).trim(), strLine.substring(split + 1, strLine.length())
+ .trim());
+ }
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ return properties;
+ }
+
+ public static File getHyracksArchive(String applicationName, String[] libJars) {
+ String target = applicationName + ".zip";
+ // Create a buffer for reading the files
+ byte[] buf = new byte[1024];
+ try {
+ ZipOutputStream out = new ZipOutputStream(new FileOutputStream(target));
+ for (int i = 0; i < libJars.length; i++) {
+ String fileName = libJars[i].substring(libJars[i].lastIndexOf("/") + 1);
+ FileInputStream in = new FileInputStream(libJars[i]);
+ out.putNextEntry(new ZipEntry(fileName));
+ int len;
+ while ((len = in.read(buf)) > 0) {
+ out.write(buf, 0, len);
+ }
+ out.closeEntry();
+ in.close();
+ }
+ out.close();
+ } catch (IOException e) {
+ }
+ File har = new File(target);
+ har.deleteOnExit();
+ return har;
+ }
+
+}