Merged fullstack_staging branch into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/pom.xml b/fullstack/hyracks/hyracks-hadoop-compat/pom.xml
new file mode 100644
index 0000000..353ec06
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/pom.xml
@@ -0,0 +1,88 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-hadoop-compat</artifactId>
+  <version>0.2.2-SNAPSHOT</version>
+  <name>hyracks-hadoop-compat</name>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.hadoop.compat.driver.CompatibilityLayer</mainClass>
+                  <name>hadoop-compat</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.apache.hadoop</groupId>
+  		<artifactId>hadoop-core</artifactId>
+  		<version>0.20.2</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+    <dependency>
+        <groupId>edu.uci.ics.dcache</groupId>
+        <artifactId>dcache-client</artifactId>
+        <version>0.0.1</version>
+        <scope>compile</scope>
+    </dependency>
+    <dependency>
+    	<groupId>edu.uci.ics.hyracks</groupId>
+    	<artifactId>hyracks-dataflow-hadoop</artifactId>
+    	<version>0.2.2-SNAPSHOT</version>
+    	<type>jar</type>
+    	<scope>compile</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml b/fullstack/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
new file mode 100644
index 0000000..dfd229a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -0,0 +1,86 @@
+package edu.uci.ics.hyracks.hadoop.compat.client;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+
+public class HyracksClient {
+
+    private static HyracksConnection connection;
+    private static final String jobProfilingKey = "jobProfilingKey";
+    Set<String> systemLibs;
+
+    public HyracksClient(Properties clusterProperties) throws Exception {
+        initialize(clusterProperties);
+    }
+
+    private void initialize(Properties properties) throws Exception {
+        String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
+        connection = new HyracksConnection(clusterController, 1098);
+        systemLibs = new HashSet<String>();
+        for (String systemLib : ConfigurationConstants.systemLibs) {
+            String systemLibPath = properties.getProperty(systemLib);
+            if (systemLibPath != null) {
+                systemLibs.add(systemLibPath);
+            }
+        }
+    }
+
+    public HyracksClient(String clusterConf, char delimiter) throws Exception {
+        Properties properties = Utilities.getProperties(clusterConf, delimiter);
+        initialize(properties);
+    }
+
+    private Set<String> getRequiredLibs(Set<String> userLibs) {
+        Set<String> requiredLibs = new HashSet<String>();
+        for (String systemLib : systemLibs) {
+            requiredLibs.add(systemLib);
+        }
+        for (String userLib : userLibs) {
+            requiredLibs.add(userLib);
+        }
+        return requiredLibs;
+    }
+
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        return connection.getJobStatus(jobId);
+    }
+
+    private void createApplication(String applicationName, Set<String> userLibs) throws Exception {
+        connection.createApplication(applicationName,
+                Utilities.getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
+    }
+
+    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec) throws Exception {
+        String jobProfilingVal = System.getenv(jobProfilingKey);
+        boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
+        JobId jobId;
+        if (doProfiling) {
+            System.out.println("PROFILING");
+            jobId = connection.startJob(applicationName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        } else {
+            jobId = connection.startJob(applicationName, spec);
+        }
+        HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+        return runningJob;
+    }
+
+    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> userLibs)
+            throws Exception {
+        createApplication(applicationName, userLibs);
+        return submitJob(applicationName, spec);
+    }
+
+    public void waitForCompleton(JobId jobId) throws Exception {
+        connection.waitForCompletion(jobId);
+    }
+}
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
new file mode 100644
index 0000000..776fd54
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -0,0 +1,158 @@
+package edu.uci.ics.hyracks.hadoop.compat.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksRunningJob implements RunningJob {
+
+    JobId jobId;
+    IOperatorDescriptorRegistry spec;
+    HyracksClient hyracksClient;
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(JobId jobId) {
+        this.jobId = jobId;
+    }
+
+    public IOperatorDescriptorRegistry getSpec() {
+        return spec;
+    }
+
+    public void setSpec(IOperatorDescriptorRegistry spec) {
+        this.spec = spec;
+    }
+
+    public HyracksRunningJob(JobId jobId, IOperatorDescriptorRegistry jobSpec, HyracksClient hyracksClient) {
+        this.spec = jobSpec;
+        this.jobId = jobId;
+        this.hyracksClient = hyracksClient;
+    }
+
+    @Override
+    public float cleanupProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public Counters getCounters() throws IOException {
+        return new Counters();
+    }
+
+    @Override
+    public JobID getID() {
+        return new JobID(this.jobId.toString(), 1);
+    }
+
+    @Override
+    public String getJobFile() {
+        return "";
+    }
+
+    @Override
+    public String getJobID() {
+        return this.jobId.toString();
+    }
+
+    @Override
+    public String getJobName() {
+        return this.jobId.toString();
+    }
+
+    @Override
+    public int getJobState() throws IOException {
+        return isComplete() ? 2 : 1;
+    }
+
+    @Override
+    public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) throws IOException {
+        return new TaskCompletionEvent[0];
+    }
+
+    @Override
+    public String getTrackingURL() {
+        return " running on hyrax, remote kill is not supported ";
+    }
+
+    @Override
+    public boolean isComplete() throws IOException {
+        edu.uci.ics.hyracks.api.job.JobStatus status = null;
+        try {
+            status = hyracksClient.getJobStatus(jobId);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return status.equals(edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED);
+    }
+
+    @Override
+    public boolean isSuccessful() throws IOException {
+        return isComplete();
+    }
+
+    @Override
+    public void killJob() throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void killTask(String taskId, boolean shouldFail) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public float mapProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public float reduceProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void setJobPriority(String priority) throws IOException {
+
+    }
+
+    @Override
+    public float setupProgress() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void waitForCompletion() throws IOException {
+        while (!isComplete()) {
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException ie) {
+            }
+        }
+    }
+
+    @Override
+    public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void killTask(TaskAttemptID arg0, boolean arg1) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+}
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
new file mode 100644
index 0000000..bf09bb0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -0,0 +1,190 @@
+package edu.uci.ics.hyracks.hadoop.compat.driver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
+import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
+import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+
+public class CompatibilityLayer {
+
+    HyracksClient hyracksClient;
+    DCacheHandler dCacheHander = null;
+    Properties clusterConf;
+    HadoopAdapter hadoopAdapter;
+
+    private static char configurationFileDelimiter = '=';
+    private static final String dacheKeyPrefix = "dcache.key";
+
+    public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+        initialize(clConfig);
+    }
+
+    private void initialize(CompatibilityConfig clConfig) throws Exception {
+        clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
+        hadoopAdapter = new HadoopAdapter(clusterConf.getProperty(ConfigurationConstants.namenodeURL));
+        hyracksClient = new HyracksClient(clusterConf);
+        dCacheHander = new DCacheHandler(clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration));
+    }
+
+    public HyracksRunningJob submitJob(JobConf conf, Set<String> userLibs) throws Exception {
+        List<JobConf> jobConfs = new ArrayList<JobConf>();
+        jobConfs.add(conf);
+        String applicationName = conf.getJobName() + System.currentTimeMillis();
+        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+        HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+        return hyracksRunningJob;
+    }
+
+    public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles, Set<String> userLibs)
+            throws Exception {
+        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+        populateDCache(jobFiles[0]);
+        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+        HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+        return hyracksRunningJob;
+    }
+
+    public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles) throws Exception {
+        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+        populateDCache(jobFiles[0]);
+        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+        HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec);
+        return hyracksRunningJob;
+    }
+
+    private void populateDCache(String jobFile) throws IOException {
+        Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
+        String tempDir = "/tmp";
+        if (dcacheTasks.size() > 0) {
+            for (String key : dcacheTasks.keySet()) {
+                String destPath = tempDir + "/" + key + System.currentTimeMillis();
+                hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
+                System.out.println(" source :" + dcacheTasks.get(key));
+                System.out.println(" dest :" + destPath);
+                System.out.println(" key :" + key);
+                System.out.println(" value :" + destPath);
+                dCacheHander.put(key, destPath);
+            }
+        }
+    }
+
+    private String getApplicationNameForHadoopJob(JobConf jobConf) {
+        String jar = jobConf.getJar();
+        if (jar != null) {
+            return jar.substring(jar.lastIndexOf("/") >= 0 ? jar.lastIndexOf("/") + 1 : 0);
+        } else {
+            return "" + System.currentTimeMillis();
+        }
+    }
+
+    private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
+        Map<String, String> foundProperties = new HashMap<String, String>();
+        Set<Entry<Object, Object>> entrySet = properties.entrySet();
+        for (Entry entry : entrySet) {
+            String key = (String) entry.getKey();
+            String value = (String) entry.getValue();
+            if ((key.startsWith(prefix))) {
+                String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+                foundProperties.put(actualKey, value);
+            }
+        }
+        return foundProperties;
+    }
+
+    public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+        Properties jobProperties = Utilities.getProperties(jobFile, ',');
+        Map<String, String> dcacheTasks = new HashMap<String, String>();
+        Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
+        for (String key : dcacheKeys.keySet()) {
+            String sourcePath = dcacheKeys.get(key);
+            if (sourcePath != null) {
+                dcacheTasks.put(key, sourcePath);
+            }
+        }
+        return dcacheTasks;
+    }
+
+    public void waitForCompletion(JobId jobId) throws Exception {
+        hyracksClient.waitForCompleton(jobId);
+    }
+
+    private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
+        List<JobConf> jobConfs = new ArrayList<JobConf>();
+        for (String jobFile : jobFiles) {
+            jobConfs.add(constructHadoopJobConf(jobFile));
+        }
+        return jobConfs;
+    }
+
+    private JobConf constructHadoopJobConf(String jobFile) {
+        Properties jobProperties = Utilities.getProperties(jobFile, '=');
+        JobConf conf = new JobConf(hadoopAdapter.getConf());
+        for (Entry entry : jobProperties.entrySet()) {
+            conf.set((String) entry.getKey(), (String) entry.getValue());
+            System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
+        }
+        return conf;
+    }
+
+    private String[] getJobs(CompatibilityConfig clConfig) {
+        return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
+    }
+
+    public static void main(String args[]) throws Exception {
+        long startTime = System.nanoTime();
+        CompatibilityConfig clConfig = new CompatibilityConfig();
+        CmdLineParser cp = new CmdLineParser(clConfig);
+        try {
+            cp.parseArgument(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            cp.printUsage(System.err);
+            return;
+        }
+        CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+        String applicationName = clConfig.applicationName;
+        String[] jobFiles = compatLayer.getJobs(clConfig);
+        String[] userLibraries = null;
+        if (clConfig.userLibs != null) {
+            userLibraries = clConfig.userLibs.split(",");
+        }
+        try {
+            HyracksRunningJob hyraxRunningJob = null;
+            if (userLibraries != null) {
+                Set<String> userLibs = new HashSet<String>();
+                for (String userLib : userLibraries) {
+                    userLibs.add(userLib);
+                }
+                hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles, userLibs);
+            } else {
+                hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles);
+            }
+            compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+            long end_time = System.nanoTime();
+            System.out.println("TOTAL TIME (from Launch to Completion):"
+                    + ((end_time - startTime) / (float) 1000000000.0) + " seconds.");
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+}
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
new file mode 100644
index 0000000..6d94bc7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import org.kohsuke.args4j.Option;
+
+public class CompatibilityConfig {
+
+	@Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
+			+ " (1) Address of HyracksClusterController service"
+			+ " (2) Address of Hadoop namenode service")
+	public String clusterConf;
+
+	@Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
+			+ "Each job file defines the hadoop job + "
+			+ "The order in the list defines the sequence in which"
+			+ "the jobs are to be executed")
+	public String jobFiles;
+
+	@Option(name = "-applicationName", usage = " The application as part of which the job executes")
+	public String applicationName;
+
+	@Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running ")
+	public String userLibs;
+}
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
new file mode 100644
index 0000000..281e58b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+public class ConfigurationConstants {
+
+    public static final String clusterControllerHost = "clusterControllerHost";
+    public static final String namenodeURL = "fs.default.name";
+    public static final String dcacheServerConfiguration = "dcacheServerConfiguration";
+    public static final String[] systemLibs = new String[] { "hyracksDataflowStdLib", "hyracksDataflowCommonLib",
+            "hyracksDataflowHadoopLib", "hadoopCoreLib"};
+
+}
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
new file mode 100644
index 0000000..045cd76
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+
+public class DCacheHandler {
+
+    String clientPropertyFile;
+    String key;
+    String valueFilePath;
+
+    public enum Operation {
+        GET,
+        PUT,
+        DELETE
+    }
+
+    private static DCacheClient dCacheClient;
+    private static final String[] operations = { "GET", "PUT", "DELETE" };
+
+    public DCacheHandler(String clientPropertyFile) throws Exception {
+        this.clientPropertyFile = clientPropertyFile;
+        init();
+    }
+
+    private void init() throws Exception {
+        dCacheClient = DCacheClient.get();
+        DCacheClientConfig dcacheClientConfig = new DCacheClientConfig();
+        dCacheClient.init(dcacheClientConfig);
+    }
+
+    public static DCacheClient getInstance(String clientPropertyFile) {
+        if (dCacheClient == null) {
+            dCacheClient = DCacheClient.get();
+        }
+        return dCacheClient;
+    }
+
+    public String getClientPropertyFile() {
+        return clientPropertyFile;
+    }
+
+    public void put(String key, String value) throws IOException {
+        dCacheClient.set(key, value);
+        System.out.println(" added to cache " + key + " : " + value);
+    }
+
+    public Object get(String key) throws IOException {
+        return dCacheClient.get(key);
+    }
+
+    public void delete(String key) throws IOException {
+        dCacheClient.delete(key);
+    }
+
+    public Object performOperation(String operation, String[] args) throws Exception {
+        Object returnValue = null;
+        int operationIndex = getOperation(operation);
+        switch (operationIndex) {
+            case 0:
+                returnValue = dCacheClient.get(args[2]);
+                System.out.println(" get from cache " + returnValue);
+                break;
+            case 1:
+                dCacheClient.set(args[2], args[3]);
+                System.out.println(" added to cache " + args[2] + " : " + args[3]);
+                break;
+            case 2:
+                dCacheClient.delete(args[2]);
+                System.out.println(" removed from cache " + args[2]);
+                break;
+            default:
+                System.out.println("Error : Operation not supported !");
+                break;
+        }
+        return returnValue;
+    }
+
+    private int getOperation(String operation) {
+        for (int i = 0; i < operations.length; i++) {
+            if (operations[i].equalsIgnoreCase(operation)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+}
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
new file mode 100644
index 0000000..1d6369a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -0,0 +1,408 @@
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+
+public class HadoopAdapter {
+
+	public static final String FS_DEFAULT_NAME = "fs.default.name";
+	private JobConf jobConf;
+	private Map<OperatorDescriptorId, Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId, Integer>();
+	public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
+	public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
+	public static final int DEFAULT_MAX_MAPPERS = 40;
+	public static final int DEFAULT_MAX_REDUCERS = 40;
+	public static final String MAX_MAPPERS_KEY = "maxMappers";
+	public static final String MAX_REDUCERS_KEY = "maxReducers";
+	public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
+
+	private int maxMappers = DEFAULT_MAX_MAPPERS;
+	private int maxReducers = DEFAULT_MAX_REDUCERS;
+	private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
+
+	class NewHadoopConstants {
+		public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
+		public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
+		public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
+		public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
+		public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
+		public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
+	}
+
+	public HadoopAdapter(String namenodeUrl) {
+		jobConf = new JobConf(true);
+		jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+		if (System.getenv(MAX_MAPPERS_KEY) != null) {
+			maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
+		}
+		if (System.getenv(MAX_REDUCERS_KEY) != null) {
+			maxReducers = Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
+		}
+		if (System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
+			exSortFrame = Integer.parseInt(System
+					.getenv(EX_SORT_FRAME_LIMIT_KEY));
+		}
+	}
+
+	private String getEnvironmentVariable(String key, String def) {
+		String ret = System.getenv(key);
+		return ret != null ? ret : def;
+	}
+
+	public JobConf getConf() {
+		return jobConf;
+	}
+
+	public static VersionedProtocol getProtocol(Class protocolClass,
+			InetSocketAddress inetAddress, JobConf jobConf) throws IOException {
+		VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass,
+				ClientProtocol.versionID, inetAddress, jobConf);
+		return versionedProtocol;
+	}
+
+	private static RecordDescriptor getHadoopRecordDescriptor(
+			String className1, String className2) {
+		RecordDescriptor recordDescriptor = null;
+		try {
+			recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+					(Class<? extends Writable>) Class.forName(className1),
+					(Class<? extends Writable>) Class.forName(className2));
+		} catch (ClassNotFoundException cnfe) {
+			cnfe.printStackTrace();
+		}
+		return recordDescriptor;
+	}
+
+	private Object[] getInputSplits(JobConf conf) throws IOException,
+			ClassNotFoundException, InterruptedException {
+		if (conf.getUseNewMapper()) {
+			return getNewInputSplits(conf);
+		} else {
+			return getOldInputSplits(conf);
+		}
+	}
+
+	private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(
+			JobConf conf) throws ClassNotFoundException, IOException,
+			InterruptedException {
+		org.apache.hadoop.mapreduce.InputSplit[] splits = null;
+		JobContext context = new JobContext(conf, null);
+		org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils
+				.newInstance(context.getInputFormatClass(), conf);
+		List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat
+				.getSplits(context);
+		return inputSplits
+				.toArray(new org.apache.hadoop.mapreduce.InputSplit[] {});
+	}
+
+	private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
+		InputFormat inputFormat = conf.getInputFormat();
+		return inputFormat.getSplits(conf, conf.getNumMapTasks());
+	}
+
+	private void configurePartitionCountConstraint(JobSpecification spec,
+			IOperatorDescriptor operator, int instanceCount) {
+		PartitionConstraintHelper.addPartitionCountConstraint(spec, operator,
+				instanceCount);
+		operatorInstanceCount.put(operator.getOperatorId(), instanceCount);
+	}
+
+	public HadoopMapperOperatorDescriptor getMapper(JobConf conf,
+			JobSpecification spec, IOperatorDescriptor previousOp)
+			throws Exception {
+		boolean selfRead = previousOp == null;
+		IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
+		HadoopMapperOperatorDescriptor mapOp = null;
+		if (selfRead) {
+			Object[] splits = getInputSplits(conf, maxMappers);
+			mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
+					classFactory);
+			configurePartitionCountConstraint(spec, mapOp, splits.length);
+		} else {
+			configurePartitionCountConstraint(spec, mapOp,
+					getInstanceCount(previousOp));
+			mapOp = new HadoopMapperOperatorDescriptor(spec, conf, classFactory);
+			spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0,
+					mapOp, 0);
+		}
+		return mapOp;
+	}
+
+	public HadoopReducerOperatorDescriptor getReducer(JobConf conf,
+			IOperatorDescriptorRegistry spec, boolean useAsCombiner) {
+		HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(
+				spec, conf, null, new ClasspathBasedHadoopClassFactory(),
+				useAsCombiner);
+		return reduceOp;
+	}
+
+	public FileSystem getHDFSClient() {
+		FileSystem fileSystem = null;
+		try {
+			fileSystem = FileSystem.get(jobConf);
+		} catch (IOException ioe) {
+			ioe.printStackTrace();
+		}
+		return fileSystem;
+	}
+
+	public JobSpecification getJobSpecification(List<JobConf> jobConfs)
+			throws Exception {
+		JobSpecification spec = null;
+		if (jobConfs.size() == 1) {
+			spec = getJobSpecification(jobConfs.get(0));
+		} else {
+			spec = getPipelinedSpec(jobConfs);
+		}
+		return spec;
+	}
+
+	private IOperatorDescriptor configureOutput(
+			IOperatorDescriptor previousOperator, JobConf conf,
+			JobSpecification spec) throws Exception {
+		int instanceCountPreviousOperator = operatorInstanceCount
+				.get(previousOperator.getOperatorId());
+		int numOutputters = conf.getNumReduceTasks() != 0 ? conf
+				.getNumReduceTasks() : instanceCountPreviousOperator;
+		HadoopWriteOperatorDescriptor writer = null;
+		writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
+		configurePartitionCountConstraint(spec, writer, numOutputters);
+		spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator,
+				0, writer, 0);
+		return writer;
+	}
+
+	private int getInstanceCount(IOperatorDescriptor operator) {
+		return operatorInstanceCount.get(operator.getOperatorId());
+	}
+
+	private IOperatorDescriptor addCombiner(
+			IOperatorDescriptor previousOperator, JobConf jobConf,
+			JobSpecification spec) throws Exception {
+		boolean useCombiner = (jobConf.getCombinerClass() != null);
+		IOperatorDescriptor mapSideOutputOp = previousOperator;
+		if (useCombiner) {
+			System.out.println("Using Combiner:"
+					+ jobConf.getCombinerClass().getName());
+			IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(
+					jobConf, spec);
+			configurePartitionCountConstraint(spec, mapSideCombineSortOp,
+					getInstanceCount(previousOperator));
+
+			HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(
+					jobConf, spec, true);
+			configurePartitionCountConstraint(spec, mapSideCombineReduceOp,
+					getInstanceCount(previousOperator));
+			spec.connect(new OneToOneConnectorDescriptor(spec),
+					previousOperator, 0, mapSideCombineSortOp, 0);
+			spec.connect(new OneToOneConnectorDescriptor(spec),
+					mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
+			mapSideOutputOp = mapSideCombineReduceOp;
+		}
+		return mapSideOutputOp;
+	}
+
+	private int getNumReduceTasks(JobConf jobConf) {
+		int numReduceTasks = Math.min(maxReducers, jobConf.getNumReduceTasks());
+		return numReduceTasks;
+	}
+
+	private IOperatorDescriptor addReducer(
+			IOperatorDescriptor previousOperator, JobConf jobConf,
+			JobSpecification spec) throws Exception {
+		IOperatorDescriptor mrOutputOperator = previousOperator;
+		if (jobConf.getNumReduceTasks() != 0) {
+			IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
+			HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec,
+					false);
+			int numReduceTasks = getNumReduceTasks(jobConf);
+			configurePartitionCountConstraint(spec, sorter, numReduceTasks);
+			configurePartitionCountConstraint(spec, reducer, numReduceTasks);
+
+			IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(
+					jobConf, spec);
+			spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter,
+					0);
+			spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
+					reducer, 0);
+			mrOutputOperator = reducer;
+		}
+		return mrOutputOperator;
+	}
+
+	private long getInputSize(Object[] splits, JobConf conf)
+			throws IOException, InterruptedException {
+		long totalInputSize = 0;
+		if (conf.getUseNewMapper()) {
+			for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[]) splits) {
+				totalInputSize += split.getLength();
+			}
+		} else {
+			for (InputSplit split : (InputSplit[]) splits) {
+				totalInputSize += split.getLength();
+			}
+		}
+		return totalInputSize;
+	}
+
+	private Object[] getInputSplits(JobConf conf, int desiredMaxMappers)
+			throws Exception {
+		Object[] splits = getInputSplits(conf);
+		if (splits.length > desiredMaxMappers) {
+			long totalInputSize = getInputSize(splits, conf);
+			long goalSize = (totalInputSize / desiredMaxMappers);
+			conf.setLong("mapred.min.split.size", goalSize);
+			conf.setNumMapTasks(desiredMaxMappers);
+			splits = getInputSplits(conf);
+		}
+		return splits;
+	}
+
+	public JobSpecification getPipelinedSpec(List<JobConf> jobConfs)
+			throws Exception {
+		JobSpecification spec = new JobSpecification();
+		Iterator<JobConf> iterator = jobConfs.iterator();
+		JobConf firstMR = iterator.next();
+		IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec, firstMR);
+		while (iterator.hasNext())
+			for (JobConf currentJobConf : jobConfs) {
+				mrOutputOp = configureMapReduce(mrOutputOp, spec,
+						currentJobConf);
+			}
+		configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
+		return spec;
+	}
+
+	public JobSpecification getJobSpecification(JobConf conf) throws Exception {
+		JobSpecification spec = new JobSpecification();
+		IOperatorDescriptor mrOutput = configureMapReduce(null, spec, conf);
+		IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
+		spec.addRoot(printer);
+		System.out.println(spec);
+		return spec;
+	}
+
+	private IOperatorDescriptor configureMapReduce(
+			IOperatorDescriptor previousOuputOp, JobSpecification spec,
+			JobConf conf) throws Exception {
+		IOperatorDescriptor mapper = getMapper(conf, spec, previousOuputOp);
+		IOperatorDescriptor mapSideOutputOp = addCombiner(mapper, conf, spec);
+		IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
+		return reducer;
+	}
+
+	public static InMemorySortOperatorDescriptor getInMemorySorter(
+			JobConf conf, IOperatorDescriptorRegistry spec) {
+		InMemorySortOperatorDescriptor inMemorySortOp = null;
+		RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+				.getMapOutputKeyClass().getName(), conf
+				.getMapOutputValueClass().getName());
+		Class<? extends RawComparator> rawComparatorClass = null;
+		WritableComparator writableComparator = WritableComparator.get(conf
+				.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+		WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+				writableComparator.getClass());
+		inMemorySortOp = new InMemorySortOperatorDescriptor(spec,
+				new int[] { 0 },
+				new IBinaryComparatorFactory[] { comparatorFactory },
+				recordDescriptor);
+		return inMemorySortOp;
+	}
+
+	public static ExternalSortOperatorDescriptor getExternalSorter(
+			JobConf conf, IOperatorDescriptorRegistry spec) {
+		ExternalSortOperatorDescriptor externalSortOp = null;
+		RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+				.getMapOutputKeyClass().getName(), conf
+				.getMapOutputValueClass().getName());
+		Class<? extends RawComparator> rawComparatorClass = null;
+		WritableComparator writableComparator = WritableComparator.get(conf
+				.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+		WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+				writableComparator.getClass());
+		externalSortOp = new ExternalSortOperatorDescriptor(spec, conf.getInt(
+				HYRACKS_EX_SORT_FRAME_LIMIT, DEFAULT_EX_SORT_FRAME_LIMIT),
+				new int[] { 0 },
+				new IBinaryComparatorFactory[] { comparatorFactory },
+				recordDescriptor);
+		return externalSortOp;
+	}
+
+	public static MToNPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
+			JobConf conf, IConnectorDescriptorRegistry spec) {
+
+		Class mapOutputKeyClass = conf.getMapOutputKeyClass();
+		Class mapOutputValueClass = conf.getMapOutputValueClass();
+
+		MToNPartitioningConnectorDescriptor connectorDescriptor = null;
+		ITuplePartitionComputerFactory factory = null;
+		conf.getMapOutputKeyClass();
+		if (conf.getPartitionerClass() != null
+				&& !conf.getPartitionerClass().getName().startsWith(
+						"org.apache.hadoop")) {
+			Class<? extends Partitioner> partitioner = conf
+					.getPartitionerClass();
+			factory = new HadoopPartitionerTuplePartitionComputerFactory(
+					partitioner, DatatypeHelper
+							.createSerializerDeserializer(mapOutputKeyClass),
+					DatatypeHelper
+							.createSerializerDeserializer(mapOutputValueClass));
+		} else {
+			RecordDescriptor recordDescriptor = DatatypeHelper
+					.createKeyValueRecordDescriptor(mapOutputKeyClass,
+							mapOutputValueClass);
+			ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
+					.createSerializerDeserializer(mapOutputKeyClass);
+			factory = new HadoopHashTuplePartitionComputerFactory(
+					mapOutputKeySerializerDerserializer);
+		}
+		connectorDescriptor = new MToNPartitioningConnectorDescriptor(spec,
+				factory);
+		return connectorDescriptor;
+	}
+
+}
diff --git a/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
new file mode 100644
index 0000000..4110f04
--- /dev/null
+++ b/fullstack/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
@@ -0,0 +1,113 @@
+package edu.uci.ics.hyracks.hadoop.compat.util;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+
+public class Utilities {
+
+    public static Properties getProperties(String filePath, char delimiter) {
+        Properties properties = new Properties();
+        try {
+            FileInputStream fins = new FileInputStream(new File(filePath));
+            DataInputStream dins = new DataInputStream(fins);
+            BufferedReader br = new BufferedReader(new InputStreamReader(dins));
+            String strLine;
+            while ((strLine = br.readLine()) != null) {
+                int split = strLine.indexOf(delimiter);
+                if (split >= 0) {
+                    properties.put((strLine.substring(0, split)).trim(), strLine.substring(split + 1, strLine.length())
+                            .trim());
+                }
+            }
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+        return properties;
+    }
+
+    public static File getHyracksArchive(String applicationName, Set<String> libJars) {
+       String target = applicationName + ".zip";
+        // Create a buffer for reading the files
+        byte[] buf = new byte[1024];
+        Set<String> fileNames = new HashSet<String>();
+        try {
+            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(target));
+            for (String libJar : libJars) {
+                String fileName = libJar.substring(libJar.lastIndexOf("/") + 1);
+                if(fileNames.contains(fileName)){
+                    continue;
+                }
+                FileInputStream in = new FileInputStream(libJar);
+                out.putNextEntry(new ZipEntry(fileName));
+                int len;
+                while ((len = in.read(buf)) > 0) {
+                    out.write(buf, 0, len);
+                }
+                out.closeEntry();
+                in.close();
+                fileNames.add(fileName);
+            }
+            out.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        File har = new File(target);
+        har.deleteOnExit();
+        return har;
+    }
+    
+    public static Reporter createReporter() {
+        Reporter reporter = new Reporter() {
+
+            @Override
+            public void progress() {
+
+            }
+
+            @Override
+            public void setStatus(String arg0) {
+
+            }
+
+            @Override
+            public void incrCounter(String arg0, String arg1, long arg2) {
+
+            }
+
+            @Override
+            public void incrCounter(Enum<?> arg0, long arg1) {
+
+            }
+
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(String arg0, String arg1) {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(Enum<?> arg0) {
+                return null;
+            }
+        };
+        return reporter;
+    }
+}