Checked in Hadoop Compatibility layer

git-svn-id: https://hyracks.googlecode.com/svn/trunk@150 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hadoop-compat/.classpath b/hyracks/hyracks-hadoop-compat/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks/hyracks-hadoop-compat/.project b/hyracks/hyracks-hadoop-compat/.project
new file mode 100644
index 0000000..7d50383
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>hyracks-hadoop-compat</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..fd50bc4
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Tue Oct 19 11:05:30 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..e03a9fc
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Tue Oct 19 11:05:30 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml
new file mode 100644
index 0000000..996eb37
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/pom.xml
@@ -0,0 +1,87 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-hadoop-compat</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.hadoop.compat.CompatibilityLayer</mainClass>
+                  <name>hadoop-compat</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.apache.hadoop</groupId>
+  		<artifactId>hadoop-core</artifactId>
+  		<version>0.20.2</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+    <dependency>
+        <groupId>edu.uci.ics.dcache</groupId>
+        <artifactId>dcache-client</artifactId>
+        <version>0.0.1</version>
+        <scope>compile</scope>
+    </dependency>
+    <dependency>
+    	<groupId>edu.uci.ics.hyracks</groupId>
+    	<artifactId>hyracks-dataflow-hadoop</artifactId>
+    	<version>0.1.3-SNAPSHOT</version>
+    	<type>jar</type>
+    	<scope>compile</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
new file mode 100644
index 0000000..45c65a1
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.client;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+
+public class HyracksClient {
+
+    private HadoopAdapter hadoopAdapter;
+    private static HyracksRMIConnection connection;
+    private static final String applicationName = "CompatibilityLayer";
+
+    public HyracksClient(String clusterConf) throws Exception {
+        Properties properties = Utilities.getProperties(clusterConf, '=');
+        String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
+        String fileSystem = (String) properties.get(ConfigurationConstants.namenodeURL);
+        initialize(clusterController, fileSystem);
+    }
+
+    public HyracksClient(String clusterControllerAddr, String fileSystem) throws Exception {
+        initialize(clusterControllerAddr, fileSystem);
+    }
+
+    private void initialize(String clusterControllerAddr, String namenodeUrl) throws Exception {
+        connection = new HyracksRMIConnection(clusterControllerAddr, 1099);
+        connection.destroyApplication(applicationName);
+        hadoopAdapter = new HadoopAdapter(namenodeUrl);
+    }
+
+    public HyracksRunningJob submitJobs(List<JobConf> confs, String[] requiredLibs) throws Exception {
+        JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
+        return submitJob(spec, requiredLibs);
+    }
+
+    public HyracksRunningJob submitJob(JobConf conf, String[] requiredLibs) throws Exception {
+        JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
+        return submitJob(spec, requiredLibs);
+    }
+
+    public JobStatus getJobStatus(UUID jobId) throws Exception {
+        return connection.getJobStatus(jobId);
+    }
+
+    public HyracksRunningJob submitJob(JobSpecification spec, String[] requiredLibs) throws Exception {
+        String applicationName = "" + spec.toString().hashCode();
+        connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
+        System.out.println(" created application :" + applicationName);
+        UUID jobId = connection.createJob(applicationName, spec);
+        connection.start(jobId);
+        HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+        return runningJob;
+    }
+
+    public HadoopAdapter getHadoopAdapter() {
+        return hadoopAdapter;
+    }
+
+    public void setHadoopAdapter(HadoopAdapter hadoopAdapter) {
+        this.hadoopAdapter = hadoopAdapter;
+    }
+
+    public void waitForCompleton(UUID jobId) throws Exception {
+        connection.waitForCompletion(jobId);
+    }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
new file mode 100644
index 0000000..79a9031
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.client;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class HyracksRunningJob implements RunningJob {
+
+    UUID jobId;
+    JobSpecification spec;
+    HyracksClient hyracksClient;
+
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(UUID jobId) {
+        this.jobId = jobId;
+    }
+
+    public JobSpecification getSpec() {
+        return spec;
+    }
+
+    public void setSpec(JobSpecification spec) {
+        this.spec = spec;
+    }
+
+    public HyracksRunningJob(UUID jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
+        this.spec = jobSpec;
+        this.jobId = jobId;
+        this.hyracksClient = hyracksClient;
+    }
+
+    @Override
+    public float cleanupProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public Counters getCounters() throws IOException {
+        return new Counters();
+    }
+
+    @Override
+    public JobID getID() {
+        return new JobID(this.jobId.toString(), 1);
+    }
+
+    @Override
+    public String getJobFile() {
+        return "";
+    }
+
+    @Override
+    public String getJobID() {
+        return this.jobId.toString();
+    }
+
+    @Override
+    public String getJobName() {
+        return this.jobId.toString();
+    }
+
+    @Override
+    public int getJobState() throws IOException {
+        return isComplete() ? 2 : 1;
+    }
+
+    @Override
+    public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) throws IOException {
+        return new TaskCompletionEvent[0];
+    }
+
+    @Override
+    public String getTrackingURL() {
+        return " running on hyrax, remote kill is not supported ";
+    }
+
+    @Override
+    public boolean isComplete() throws IOException {
+        edu.uci.ics.hyracks.api.job.JobStatus status = null;
+        try {
+            status = hyracksClient.getJobStatus(jobId);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return status.equals(edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED);
+    }
+
+    @Override
+    public boolean isSuccessful() throws IOException {
+        return isComplete();
+    }
+
+    @Override
+    public void killJob() throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void killTask(String taskId, boolean shouldFail) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public float mapProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public float reduceProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void setJobPriority(String priority) throws IOException {
+
+    }
+
+    @Override
+    public float setupProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void waitForCompletion() throws IOException {
+        while (!isComplete()) {
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException ie) {
+            }
+        }
+    }
+
+    @Override
+    public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void killTask(TaskAttemptID arg0, boolean arg1) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
new file mode 100644
index 0000000..bbf0a8b
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.driver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
+import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
+import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+
+public class CompatibilityLayer {
+
+    HyracksClient hyracksClient;
+    DCacheHandler dCacheHander = null;
+    Properties clusterConf;
+    String[] systemLibs;
+
+    private static char configurationFileDelimiter = '=';
+    private static final String dacheKeyPrefix = "dcache.key";
+
+    public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+        initialize(clConfig);
+    }
+
+    public HyracksRunningJob submitJobs(String[] jobFiles, String[] userLibs) throws Exception {
+        String[] requiredLibs = getRequiredLibs(userLibs);
+        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+        Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFiles[0]);
+        String tempDir = "/tmp";
+        if (dcacheTasks.size() > 0) {
+            HadoopAdapter hadoopAdapter = hyracksClient.getHadoopAdapter();
+            for (String key : dcacheTasks.keySet()) {
+                String destPath = tempDir + "/" + key + System.currentTimeMillis();
+                hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
+                System.out.println(" source :" + dcacheTasks.get(key));
+                System.out.println(" dest :" + destPath);
+                System.out.println(" key :" + key);
+                System.out.println(" value :" + destPath);
+                dCacheHander.put(key, destPath);
+            }
+        }
+        HyracksRunningJob hyraxRunningJob = hyracksClient.submitJobs(jobConfs, requiredLibs);
+        return hyraxRunningJob;
+    }
+
+    private String[] getRequiredLibs(String[] userLibs) {
+        List<String> requiredLibs = new ArrayList<String>();
+        for (String systemLib : systemLibs) {
+            requiredLibs.add(systemLib);
+        }
+        for (String userLib : userLibs) {
+            requiredLibs.add(userLib);
+        }
+        return requiredLibs.toArray(new String[] {});
+    }
+
+    private void initialize(CompatibilityConfig clConfig) throws Exception {
+        clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
+        List<String> systemLibs = new ArrayList<String>();
+        for (String systemLib : ConfigurationConstants.systemLibs) {
+            String systemLibPath = clusterConf.getProperty(systemLib);
+            if (systemLibPath != null) {
+                systemLibs.add(systemLibPath);
+            }
+        }
+        this.systemLibs = systemLibs.toArray(new String[] {});
+        String clusterControllerHost = clusterConf.getProperty(ConfigurationConstants.clusterControllerHost);
+        String dacheServerConfiguration = clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration);
+        String fileSystem = clusterConf.getProperty(ConfigurationConstants.namenodeURL);
+        hyracksClient = new HyracksClient(clusterControllerHost, fileSystem);
+        try {
+            dCacheHander = new DCacheHandler(dacheServerConfiguration);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
+        Map<String, String> foundProperties = new HashMap<String, String>();
+        Set<Entry<Object, Object>> entrySet = properties.entrySet();
+        for (Entry entry : entrySet) {
+            String key = (String) entry.getKey();
+            String value = (String) entry.getValue();
+            if ((key.startsWith(prefix))) {
+                String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+                foundProperties.put(actualKey, value);
+            }
+        }
+        return foundProperties;
+    }
+
+    public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+        Properties jobProperties = Utilities.getProperties(jobFile, ',');
+        Map<String, String> dcacheTasks = new HashMap<String, String>();
+        Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
+        for (String key : dcacheKeys.keySet()) {
+            String sourcePath = dcacheKeys.get(key);
+            if (sourcePath != null) {
+                dcacheTasks.put(key, sourcePath);
+            }
+        }
+        return dcacheTasks;
+    }
+
+    public void waitForCompletion(UUID jobId) throws Exception {
+        hyracksClient.waitForCompleton(jobId);
+    }
+
+    public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, String[] userLibs) {
+        HyracksRunningJob hyraxRunningJob = null;
+        List<JobConf> jobConfs = new ArrayList<JobConf>();
+        jobConfs.add(jobConf);
+        try {
+            hyraxRunningJob = hyracksClient.submitJobs(jobConfs, getRequiredLibs(userLibs));
+            System.out.println(" Result in " + jobConf.get("mapred.output.dir"));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return hyraxRunningJob;
+    }
+
+    public HyracksRunningJob submitJob(JobSpecification jobSpec, String[] userLibs) {
+        HyracksRunningJob hyraxRunningJob = null;
+        try {
+            hyraxRunningJob = hyracksClient.submitJob(jobSpec, getRequiredLibs(userLibs));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return hyraxRunningJob;
+    }
+
+    private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
+        List<JobConf> jobConfs = new ArrayList<JobConf>();
+        for (String jobFile : jobFiles) {
+            jobConfs.add(constructHadoopJobConf(jobFile));
+        }
+        return jobConfs;
+    }
+
+    private JobConf constructHadoopJobConf(String jobFile) {
+        Properties jobProperties = Utilities.getProperties(jobFile, '=');
+        JobConf conf = hyracksClient.getHadoopAdapter().getConf();
+        for (Entry entry : jobProperties.entrySet()) {
+            conf.set((String) entry.getKey(), (String) entry.getValue());
+            System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
+        }
+        return conf;
+    }
+
+    private String[] getJobs(CompatibilityConfig clConfig) {
+        return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
+    }
+
+    public static void main(String args[]) throws Exception {
+        long startTime = System.nanoTime();
+        CompatibilityConfig clConfig = new CompatibilityConfig();
+        CmdLineParser cp = new CmdLineParser(clConfig);
+        try {
+            cp.parseArgument(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            cp.printUsage(System.err);
+            return;
+        }
+        CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+        String[] jobFiles = compatLayer.getJobs(clConfig);
+        String[] userLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
+        HyracksRunningJob hyraxRunningJob = null;
+        try {
+            hyraxRunningJob = compatLayer.submitJobs(jobFiles, userLibs);
+            compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        hyraxRunningJob.waitForCompletion();
+        long end_time = System.nanoTime();
+        System.out.println("TOTAL TIME (from Launch to Completion):" + ((end_time - startTime) / (float) 1000000000.0)
+                + " seconds.");
+    }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
new file mode 100644
index 0000000..cf5b2df
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import org.kohsuke.args4j.Option;
+
+public class CompatibilityConfig {
+
+    @Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
+            + " (1) Address of HyracksClusterController service" + " (2) Address of Hadoop namenode service")
+    public String clusterConf;
+
+    @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
+            + "Each job file defines the hadoop job + " + "The order in the list defines the sequence in which"
+            + "the jobs are to be executed")
+    public String jobFiles;
+
+    @Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running "
+            + " mappers/reducers etc ")
+    public String userLibs;
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
new file mode 100644
index 0000000..049191c
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+public class ConfigurationConstants {
+
+    public static final String clusterControllerHost = "clusterControllerHost";
+    public static final String namenodeURL = "fs.default.name";
+    public static final String dcacheServerConfiguration = "dcacheServerConfiguration";
+    public static final String[] systemLibs = new String[] { "hyracksDataflowStdLib", "hyracksDataflowCommonLib",
+            "hyracksDataflowHadoopLib", "hadoopCoreLib" };
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
new file mode 100644
index 0000000..5b77548
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.IOException;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+
+public class DCacheHandler {
+
+    String clientPropertyFile;
+    String key;
+    String valueFilePath;
+
+    public enum Operation {
+        GET,
+        PUT,
+        DELETE
+    }
+
+    private static DCacheClient dCacheClient;
+    private static final String[] operations = { "GET", "PUT", "DELETE" };
+
+    public DCacheHandler(String clientPropertyFile) throws Exception {
+        this.clientPropertyFile = clientPropertyFile;
+        init();
+    }
+
+    private void init() throws Exception {
+        dCacheClient = DCacheClient.get();
+        DCacheClientConfig dcacheClientConfig = new DCacheClientConfig();
+        dCacheClient.init(dcacheClientConfig);
+    }
+
+    public static DCacheClient getInstance(String clientPropertyFile) {
+        if (dCacheClient == null) {
+            dCacheClient = DCacheClient.get();
+        }
+        return dCacheClient;
+    }
+
+    public String getClientPropertyFile() {
+        return clientPropertyFile;
+    }
+
+    public void put(String key, String value) throws IOException {
+        dCacheClient.set(key, value);
+        System.out.println(" added to cache " + key + " : " + value);
+    }
+
+    public Object get(String key) throws IOException {
+        return dCacheClient.get(key);
+    }
+
+    public void delete(String key) throws IOException {
+        dCacheClient.delete(key);
+    }
+
+    public Object performOperation(String operation, String[] args) throws Exception {
+        Object returnValue = null;
+        int operationIndex = getOperation(operation);
+        switch (operationIndex) {
+            case 0:
+                returnValue = dCacheClient.get(args[2]);
+                System.out.println(" get from cache " + returnValue);
+                break;
+            case 1:
+                dCacheClient.set(args[2], args[3]);
+                System.out.println(" added to cache " + args[2] + " : " + args[3]);
+                break;
+            case 2:
+                dCacheClient.delete(args[2]);
+                System.out.println(" removed from cache " + args[2]);
+                break;
+            default:
+                System.out.println("Error : Operation not supported !");
+                break;
+        }
+        return returnValue;
+    }
+
+    private int getOperation(String operation) {
+        for (int i = 0; i < operations.length; i++) {
+            if (operations[i].equalsIgnoreCase(operation)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
new file mode 100644
index 0000000..f4fcd83
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReadOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+
+public class HadoopAdapter {
+
+    public static final String FS_DEFAULT_NAME = "fs.default.name";
+    private JobConf jobConf;
+
+    public HadoopAdapter(String namenodeUrl) {
+        jobConf = new JobConf(true);
+        jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+    }
+
+    public JobConf getConf() {
+        return jobConf;
+    }
+
+    public static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf)
+            throws IOException {
+        VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress,
+                jobConf);
+        return versionedProtocol;
+    }
+
+    private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
+        RecordDescriptor recordDescriptor = null;
+        try {
+            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                    (Class<? extends Writable>) Class.forName(className1),
+                    (Class<? extends Writable>) Class.forName(className2));
+        } catch (ClassNotFoundException cnfe) {
+            cnfe.printStackTrace();
+        }
+        return recordDescriptor;
+    }
+
+    private InputSplit[] getInputSplits(JobConf jobConf) throws IOException {
+        InputFormat inputFormat = jobConf.getInputFormat();
+        return inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
+    }
+
+    public HadoopMapperOperatorDescriptor getMapper(JobConf conf, InputSplit[] splits, JobSpecification spec)
+            throws IOException {
+        HadoopMapperOperatorDescriptor mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
+                new ClasspathBasedHadoopClassFactory());
+        return mapOp;
+    }
+
+    public HadoopReducerOperatorDescriptor getReducer(JobConf conf, JobSpecification spec) {
+        HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(spec, conf, null,
+                new ClasspathBasedHadoopClassFactory());
+        return reduceOp;
+    }
+
+    public FileSystem getHDFSClient() {
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = FileSystem.get(jobConf);
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+        return fileSystem;
+    }
+
+    public JobSpecification getJobSpecification(List<JobConf> jobConfs) throws Exception {
+        JobSpecification spec = null;
+        if (jobConfs.size() == 1) {
+            spec = getJobSpecification(jobConfs.get(0));
+        } else {
+            spec = getPipelinedSpec(jobConfs);
+        }
+        return spec;
+    }
+
+    private IOperatorDescriptor configureOutput(IOperatorDescriptor previousOperator, JobConf conf,
+            JobSpecification spec) throws Exception {
+        PartitionConstraint previousOpConstraint = previousOperator.getPartitionConstraint();
+        int noOfInputs = previousOpConstraint instanceof PartitionCountConstraint ? ((PartitionCountConstraint) previousOpConstraint)
+                .getCount() : ((ExplicitPartitionConstraint) previousOpConstraint).getLocationConstraints().length;
+        int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfInputs;
+        HadoopWriteOperatorDescriptor writer = null;
+        writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
+        writer.setPartitionConstraint(previousOperator.getPartitionConstraint());
+        spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
+        return writer;
+    }
+
+    private IOperatorDescriptor addMRToExistingPipeline(IOperatorDescriptor previousOperator, JobConf jobConf,
+            JobSpecification spec, InputSplit[] splits) throws IOException {
+        HadoopMapperOperatorDescriptor mapOp = getMapper(jobConf, splits, spec);
+        IOperatorDescriptor mrOutputOperator = mapOp;
+        PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
+        mapOp.setPartitionConstraint(mapperPartitionConstraint);
+        spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapOp, 0);
+        IOperatorDescriptor mapOutputOperator = mapOp;
+
+        boolean useCombiner = (jobConf.getCombinerClass() != null);
+        if (useCombiner) {
+            System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
+            InMemorySortOperatorDescriptor mapSideCombineSortOp = getSorter(jobConf, spec);
+            mapSideCombineSortOp.setPartitionConstraint(mapperPartitionConstraint);
+
+            HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
+            mapSideCombineReduceOp.setPartitionConstraint(mapperPartitionConstraint);
+            mapOutputOperator = mapSideCombineReduceOp;
+
+            spec.connect(new OneToOneConnectorDescriptor(spec), mapOp, 0, mapSideCombineSortOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
+            mrOutputOperator = mapSideCombineReduceOp;
+        }
+
+        if (jobConf.getNumReduceTasks() != 0) {
+            IOperatorDescriptor sorter = getSorter(jobConf, spec);
+            HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
+
+            PartitionConstraint reducerPartitionConstraint = new PartitionCountConstraint(jobConf.getNumReduceTasks());
+            sorter.setPartitionConstraint(reducerPartitionConstraint);
+            reducer.setPartitionConstraint(reducerPartitionConstraint);
+
+            IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
+            spec.connect(mToNConnectorDescriptor, mapOutputOperator, 0, sorter, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
+            mrOutputOperator = reducer;
+        }
+        return mrOutputOperator;
+    }
+
+    public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
+        JobSpecification spec = new JobSpecification();
+        Iterator<JobConf> iterator = jobConfs.iterator();
+        JobConf firstMR = iterator.next();
+        InputSplit[] splits = getInputSplits(firstMR);
+        IOperatorDescriptor reader = new HadoopReadOperatorDescriptor(firstMR, spec, splits);
+        IOperatorDescriptor outputOperator = reader;
+        outputOperator = addMRToExistingPipeline(reader, firstMR, spec, splits);
+        while (iterator.hasNext())
+            for (JobConf currentJobConf : jobConfs) {
+                outputOperator = addMRToExistingPipeline(outputOperator, currentJobConf, spec, null);
+            }
+        configureOutput(outputOperator, jobConfs.get(jobConfs.size() - 1), spec);
+        return spec;
+    }
+
+    public JobSpecification getJobSpecification(JobConf conf) throws Exception {
+        JobSpecification spec = new JobSpecification();
+        InputSplit[] splits = getInputSplits(conf);
+        HadoopReadOperatorDescriptor hadoopReadOperatorDescriptor = new HadoopReadOperatorDescriptor(conf, spec, splits);
+        IOperatorDescriptor mrOutputOperator = addMRToExistingPipeline(hadoopReadOperatorDescriptor, conf, spec, splits);
+        IOperatorDescriptor printer = configureOutput(mrOutputOperator, conf, spec);
+        spec.addRoot(printer);
+        return spec;
+    }
+
+    public static InMemorySortOperatorDescriptor getSorter(JobConf conf, JobSpecification spec) {
+        InMemorySortOperatorDescriptor inMemorySortOp = null;
+        RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
+                .getMapOutputValueClass().getName());
+        Class<? extends RawComparator> rawComparatorClass = null;
+        WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
+                WritableComparable.class));
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+                writableComparator.getClass());
+        inMemorySortOp = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
+                new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
+        return inMemorySortOp;
+    }
+
+    public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
+            JobSpecification spec) {
+
+        Class mapOutputKeyClass = conf.getMapOutputKeyClass();
+        Class mapOutputValueClass = conf.getMapOutputValueClass();
+
+        MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
+        ITuplePartitionComputerFactory factory = null;
+        conf.getMapOutputKeyClass();
+        if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
+            Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
+            factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner,
+                    DatatypeHelper.createSerializerDeserializer(mapOutputKeyClass),
+                    DatatypeHelper.createSerializerDeserializer(mapOutputValueClass));
+        } else {
+            RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
+                    mapOutputValueClass);
+            ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
+                    .createSerializerDeserializer(mapOutputKeyClass);
+            factory = new HadoopHashTuplePartitionComputerFactory(mapOutputKeySerializerDerserializer);
+        }
+        connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec, factory);
+        return connectorDescriptor;
+    }
+
+}
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
new file mode 100644
index 0000000..d6bd652
--- /dev/null
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Properties;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+public class Utilities {
+
+    public static Properties getProperties(String filePath, char delimiter) {
+        Properties properties = new Properties();
+        try {
+            FileInputStream fins = new FileInputStream(new File(filePath));
+            DataInputStream dins = new DataInputStream(fins);
+            BufferedReader br = new BufferedReader(new InputStreamReader(dins));
+            String strLine;
+            while ((strLine = br.readLine()) != null) {
+                int split = strLine.indexOf(delimiter);
+                if (split >= 0) {
+                    properties.put((strLine.substring(0, split)).trim(), strLine.substring(split + 1, strLine.length())
+                            .trim());
+                }
+            }
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+        return properties;
+    }
+
+    public static File getHyracksArchive(String applicationName, String[] libJars) {
+        String target = applicationName + ".zip";
+        // Create a buffer for reading the files
+        byte[] buf = new byte[1024];
+        try {
+            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(target));
+            for (int i = 0; i < libJars.length; i++) {
+                String fileName = libJars[i].substring(libJars[i].lastIndexOf("/") + 1);
+                FileInputStream in = new FileInputStream(libJars[i]);
+                out.putNextEntry(new ZipEntry(fileName));
+                int len;
+                while ((len = in.read(buf)) > 0) {
+                    out.write(buf, 0, len);
+                }
+                out.closeEntry();
+                in.close();
+            }
+            out.close();
+        } catch (IOException e) {
+        }
+        File har = new File(target);
+        har.deleteOnExit();
+        return har;
+    }
+
+}