Merged hyracks_dev_next -r 1287 into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hadoop-compat/.classpath b/hyracks/hyracks-hadoop-compat/.classpath
deleted file mode 100644
index 1f3c1ff..0000000
--- a/hyracks/hyracks-hadoop-compat/.classpath
+++ /dev/null
@@ -1,7 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
-	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
-	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-hadoop-compat/.project b/hyracks/hyracks-hadoop-compat/.project
deleted file mode 100644
index 7d50383..0000000
--- a/hyracks/hyracks-hadoop-compat/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>hyracks-hadoop-compat</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
-			<name>org.maven.ide.eclipse.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-	</natures>
-</projectDescription>
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index dfac000..0000000
--- a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-#Fri May 20 19:34:05 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
-org.eclipse.jdt.core.compiler.compliance=1.6
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index e03a9fc..0000000
--- a/hyracks/hyracks-hadoop-compat/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Tue Oct 19 11:05:30 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml
index 6a6d921..2de7e04 100644
--- a/hyracks/hyracks-hadoop-compat/pom.xml
+++ b/hyracks/hyracks-hadoop-compat/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-hadoop-compat</artifactId>
-  <version>0.1.9-SNAPSHOT</version>
+  <version>0.2.0-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.9-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -79,7 +79,7 @@
     <dependency>
     	<groupId>edu.uci.ics.hyracks</groupId>
     	<artifactId>hyracks-dataflow-hadoop</artifactId>
-    	<version>0.1.9-SNAPSHOT</version>
+    	<version>0.2.0-SNAPSHOT</version>
     	<type>jar</type>
     	<scope>compile</scope>
     </dependency>
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index a363221..eadf679 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -4,10 +4,10 @@
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
-import java.util.UUID;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
@@ -15,77 +15,73 @@
 
 public class HyracksClient {
 
-	private static HyracksRMIConnection connection;
-	private static final String jobProfilingKey = "jobProfilingKey";
-	Set<String> systemLibs;
+    private static HyracksConnection connection;
+    private static final String jobProfilingKey = "jobProfilingKey";
+    Set<String> systemLibs;
 
-	public HyracksClient(Properties clusterProperties) throws Exception {
-		initialize(clusterProperties);
-	}
+    public HyracksClient(Properties clusterProperties) throws Exception {
+        initialize(clusterProperties);
+    }
 
-	private void initialize(Properties properties) throws Exception {
-		String clusterController = (String) properties
-				.get(ConfigurationConstants.clusterControllerHost);
-		connection = new HyracksRMIConnection(clusterController, 1099);
-		systemLibs = new HashSet<String>();
-		for (String systemLib : ConfigurationConstants.systemLibs) {
-			String systemLibPath = properties.getProperty(systemLib);
-			if (systemLibPath != null) {
-				systemLibs.add(systemLibPath);
-			}
-		}
-	}
+    private void initialize(Properties properties) throws Exception {
+        String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
+        connection = new HyracksConnection(clusterController, 1098);
+        systemLibs = new HashSet<String>();
+        for (String systemLib : ConfigurationConstants.systemLibs) {
+            String systemLibPath = properties.getProperty(systemLib);
+            if (systemLibPath != null) {
+                systemLibs.add(systemLibPath);
+            }
+        }
+    }
 
-	public HyracksClient(String clusterConf, char delimiter) throws Exception {
-		Properties properties = Utilities.getProperties(clusterConf, delimiter);
-		initialize(properties);
-	}
+    public HyracksClient(String clusterConf, char delimiter) throws Exception {
+        Properties properties = Utilities.getProperties(clusterConf, delimiter);
+        initialize(properties);
+    }
 
-	private Set<String> getRequiredLibs(Set<String> userLibs) {
-		Set<String> requiredLibs = new HashSet<String>();
-		for (String systemLib : systemLibs) {
-			requiredLibs.add(systemLib);
-		}
-		for (String userLib : userLibs) {
-			requiredLibs.add(userLib);
-		}
-		return requiredLibs;
-	}
+    private Set<String> getRequiredLibs(Set<String> userLibs) {
+        Set<String> requiredLibs = new HashSet<String>();
+        for (String systemLib : systemLibs) {
+            requiredLibs.add(systemLib);
+        }
+        for (String userLib : userLibs) {
+            requiredLibs.add(userLib);
+        }
+        return requiredLibs;
+    }
 
-	public JobStatus getJobStatus(UUID jobId) throws Exception {
-		return connection.getJobStatus(jobId);
-	}
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        return connection.getJobStatus(jobId);
+    }
 
-	private void createApplication(String applicationName, Set<String> userLibs)
-			throws Exception {
-		connection.createApplication(applicationName, Utilities
-				.getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
-	}
+    private void createApplication(String applicationName, Set<String> userLibs) throws Exception {
+        connection.createApplication(applicationName,
+                Utilities.getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
+    }
 
-	public HyracksRunningJob submitJob(String applicationName,
-			JobSpecification spec) throws Exception {
-		String jobProfilingVal = System.getenv(jobProfilingKey);
-		boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
-		UUID jobId;
-		if (doProfiling) {
-			System.out.println("PROFILING");
-			jobId = connection.createJob(applicationName, spec, EnumSet
-					.of(JobFlag.PROFILE_RUNTIME));
-		} else {
-			jobId = connection.createJob(applicationName, spec);
-		}
-		connection.start(jobId);
-		HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
-		return runningJob;
-	}
+    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec) throws Exception {
+        String jobProfilingVal = System.getenv(jobProfilingKey);
+        boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
+        JobId jobId;
+        if (doProfiling) {
+            System.out.println("PROFILING");
+            jobId = connection.createJob(applicationName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        } else {
+            jobId = connection.createJob(applicationName, spec);
+        }
+        connection.start(jobId);
+        HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+        return runningJob;
+    }
 
-	public HyracksRunningJob submitJob(String applicationName,
-			JobSpecification spec, Set<String> userLibs) throws Exception {
-		createApplication(applicationName, userLibs);
-		return submitJob(applicationName, spec);
-	}
+    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> userLibs)
+            throws Exception {
+        createApplication(applicationName, userLibs);
+        return submitJob(applicationName, spec);
+    }
 
-	public void waitForCompleton(UUID jobId) throws Exception {
-		connection.waitForCompletion(jobId);
-	}
+    public void waitForCompleton(JobId jobId) throws Exception {
+        connection.waitForCompletion(jobId);
+    }
 }
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
index 8470e12..7c650ec 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hyracks.hadoop.compat.client;
 
 import java.io.IOException;
-import java.util.UUID;
 
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobID;
@@ -9,19 +8,20 @@
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public class HyracksRunningJob implements RunningJob {
 
-    UUID jobId;
+    JobId jobId;
     JobSpecification spec;
     HyracksClient hyracksClient;
 
-    public UUID getJobId() {
+    public JobId getJobId() {
         return jobId;
     }
 
-    public void setJobId(UUID jobId) {
+    public void setJobId(JobId jobId) {
         this.jobId = jobId;
     }
 
@@ -33,7 +33,7 @@
         this.spec = spec;
     }
 
-    public HyracksRunningJob(UUID jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
+    public HyracksRunningJob(JobId jobId, JobSpecification jobSpec, HyracksClient hyracksClient) {
         this.spec = jobSpec;
         this.jobId = jobId;
         this.hyracksClient = hyracksClient;
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index 37f4d34..bf09bb0 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -6,15 +6,16 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-import java.util.UUID;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.kohsuke.args4j.CmdLineParser;
 
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
 import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
 import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
@@ -22,187 +23,168 @@
 import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
 import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
 import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public class CompatibilityLayer {
 
-	HyracksClient hyracksClient;
-	DCacheHandler dCacheHander = null;
-	Properties clusterConf;
-	HadoopAdapter hadoopAdapter;
+    HyracksClient hyracksClient;
+    DCacheHandler dCacheHander = null;
+    Properties clusterConf;
+    HadoopAdapter hadoopAdapter;
 
-	private static char configurationFileDelimiter = '=';
-	private static final String dacheKeyPrefix = "dcache.key";
+    private static char configurationFileDelimiter = '=';
+    private static final String dacheKeyPrefix = "dcache.key";
 
-	public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
-		initialize(clConfig);
-	}
+    public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+        initialize(clConfig);
+    }
 
-	private void initialize(CompatibilityConfig clConfig) throws Exception {
-		clusterConf = Utilities.getProperties(clConfig.clusterConf,
-				configurationFileDelimiter);
-		hadoopAdapter = new HadoopAdapter(clusterConf
-				.getProperty(ConfigurationConstants.namenodeURL));
-		hyracksClient = new HyracksClient(clusterConf);
-		dCacheHander = new DCacheHandler(clusterConf
-				.getProperty(ConfigurationConstants.dcacheServerConfiguration));
-	}
+    private void initialize(CompatibilityConfig clConfig) throws Exception {
+        clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
+        hadoopAdapter = new HadoopAdapter(clusterConf.getProperty(ConfigurationConstants.namenodeURL));
+        hyracksClient = new HyracksClient(clusterConf);
+        dCacheHander = new DCacheHandler(clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration));
+    }
 
-	public HyracksRunningJob submitJob(JobConf conf,Set<String> userLibs) throws Exception {
-		List<JobConf> jobConfs = new ArrayList<JobConf>();
-		jobConfs.add(conf);
-		String applicationName = conf.getJobName() + System.currentTimeMillis();
-		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
-		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
-				applicationName, spec, userLibs);
-		return hyracksRunningJob; 
-	}
-	
-	public HyracksRunningJob submitJobs(String applicationName,
-			String[] jobFiles, Set<String> userLibs) throws Exception {
-		List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
-		populateDCache(jobFiles[0]);
-		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
-		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
-				applicationName, spec, userLibs);
-		return hyracksRunningJob;
-	}
+    public HyracksRunningJob submitJob(JobConf conf, Set<String> userLibs) throws Exception {
+        List<JobConf> jobConfs = new ArrayList<JobConf>();
+        jobConfs.add(conf);
+        String applicationName = conf.getJobName() + System.currentTimeMillis();
+        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+        HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+        return hyracksRunningJob;
+    }
 
-	public HyracksRunningJob submitJobs(String applicationName,
-			String[] jobFiles) throws Exception {
-		List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
-		populateDCache(jobFiles[0]);
-		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
-		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
-				applicationName, spec);
-		return hyracksRunningJob;
-	}
+    public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles, Set<String> userLibs)
+            throws Exception {
+        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+        populateDCache(jobFiles[0]);
+        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+        HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec, userLibs);
+        return hyracksRunningJob;
+    }
 
-	private void populateDCache(String jobFile) throws IOException {
-		Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
-		String tempDir = "/tmp";
-		if (dcacheTasks.size() > 0) {
-			for (String key : dcacheTasks.keySet()) {
-				String destPath = tempDir + "/" + key
-						+ System.currentTimeMillis();
-				hadoopAdapter.getHDFSClient().copyToLocalFile(
-						new Path(dcacheTasks.get(key)), new Path(destPath));
-				System.out.println(" source :" + dcacheTasks.get(key));
-				System.out.println(" dest :" + destPath);
-				System.out.println(" key :" + key);
-				System.out.println(" value :" + destPath);
-				dCacheHander.put(key, destPath);
-			}
-		}
-	}
+    public HyracksRunningJob submitJobs(String applicationName, String[] jobFiles) throws Exception {
+        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+        populateDCache(jobFiles[0]);
+        JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+        HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(applicationName, spec);
+        return hyracksRunningJob;
+    }
 
-	private String getApplicationNameForHadoopJob(JobConf jobConf) {
-		String jar = jobConf.getJar();
-		if (jar != null) {
-			return jar.substring(jar.lastIndexOf("/") >= 0 ? jar
-					.lastIndexOf("/") + 1 : 0);
-		} else {
-			return "" + System.currentTimeMillis();
-		}
-	}
+    private void populateDCache(String jobFile) throws IOException {
+        Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
+        String tempDir = "/tmp";
+        if (dcacheTasks.size() > 0) {
+            for (String key : dcacheTasks.keySet()) {
+                String destPath = tempDir + "/" + key + System.currentTimeMillis();
+                hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
+                System.out.println(" source :" + dcacheTasks.get(key));
+                System.out.println(" dest :" + destPath);
+                System.out.println(" key :" + key);
+                System.out.println(" value :" + destPath);
+                dCacheHander.put(key, destPath);
+            }
+        }
+    }
 
-	private Map<String, String> initializeCustomProperties(
-			Properties properties, String prefix) {
-		Map<String, String> foundProperties = new HashMap<String, String>();
-		Set<Entry<Object, Object>> entrySet = properties.entrySet();
-		for (Entry entry : entrySet) {
-			String key = (String) entry.getKey();
-			String value = (String) entry.getValue();
-			if ((key.startsWith(prefix))) {
-				String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
-				foundProperties.put(actualKey, value);
-			}
-		}
-		return foundProperties;
-	}
+    private String getApplicationNameForHadoopJob(JobConf jobConf) {
+        String jar = jobConf.getJar();
+        if (jar != null) {
+            return jar.substring(jar.lastIndexOf("/") >= 0 ? jar.lastIndexOf("/") + 1 : 0);
+        } else {
+            return "" + System.currentTimeMillis();
+        }
+    }
 
-	public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
-		Properties jobProperties = Utilities.getProperties(jobFile, ',');
-		Map<String, String> dcacheTasks = new HashMap<String, String>();
-		Map<String, String> dcacheKeys = initializeCustomProperties(
-				jobProperties, dacheKeyPrefix);
-		for (String key : dcacheKeys.keySet()) {
-			String sourcePath = dcacheKeys.get(key);
-			if (sourcePath != null) {
-				dcacheTasks.put(key, sourcePath);
-			}
-		}
-		return dcacheTasks;
-	}
+    private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
+        Map<String, String> foundProperties = new HashMap<String, String>();
+        Set<Entry<Object, Object>> entrySet = properties.entrySet();
+        for (Entry entry : entrySet) {
+            String key = (String) entry.getKey();
+            String value = (String) entry.getValue();
+            if ((key.startsWith(prefix))) {
+                String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+                foundProperties.put(actualKey, value);
+            }
+        }
+        return foundProperties;
+    }
 
-	public void waitForCompletion(UUID jobId) throws Exception {
-		hyracksClient.waitForCompleton(jobId);
-	}
+    public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+        Properties jobProperties = Utilities.getProperties(jobFile, ',');
+        Map<String, String> dcacheTasks = new HashMap<String, String>();
+        Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
+        for (String key : dcacheKeys.keySet()) {
+            String sourcePath = dcacheKeys.get(key);
+            if (sourcePath != null) {
+                dcacheTasks.put(key, sourcePath);
+            }
+        }
+        return dcacheTasks;
+    }
 
-	private List<JobConf> constructHadoopJobConfs(String[] jobFiles)
-			throws Exception {
-		List<JobConf> jobConfs = new ArrayList<JobConf>();
-		for (String jobFile : jobFiles) {
-			jobConfs.add(constructHadoopJobConf(jobFile));
-		}
-		return jobConfs;
-	}
+    public void waitForCompletion(JobId jobId) throws Exception {
+        hyracksClient.waitForCompleton(jobId);
+    }
 
-	private JobConf constructHadoopJobConf(String jobFile) {
-		Properties jobProperties = Utilities.getProperties(jobFile, '=');
-		JobConf conf = new JobConf(hadoopAdapter.getConf());
-		for (Entry entry : jobProperties.entrySet()) {
-			conf.set((String) entry.getKey(), (String) entry.getValue());
-			System.out.println((String) entry.getKey() + " : "
-					+ (String) entry.getValue());
-		}
-		return conf;
-	}
+    private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
+        List<JobConf> jobConfs = new ArrayList<JobConf>();
+        for (String jobFile : jobFiles) {
+            jobConfs.add(constructHadoopJobConf(jobFile));
+        }
+        return jobConfs;
+    }
 
-	private String[] getJobs(CompatibilityConfig clConfig) {
-		return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles
-				.split(",");
-	}
+    private JobConf constructHadoopJobConf(String jobFile) {
+        Properties jobProperties = Utilities.getProperties(jobFile, '=');
+        JobConf conf = new JobConf(hadoopAdapter.getConf());
+        for (Entry entry : jobProperties.entrySet()) {
+            conf.set((String) entry.getKey(), (String) entry.getValue());
+            System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
+        }
+        return conf;
+    }
 
-	public static void main(String args[]) throws Exception {
-		long startTime = System.nanoTime();
-		CompatibilityConfig clConfig = new CompatibilityConfig();
-		CmdLineParser cp = new CmdLineParser(clConfig);
-		try {
-			cp.parseArgument(args);
-		} catch (Exception e) {
-			System.err.println(e.getMessage());
-			cp.printUsage(System.err);
-			return;
-		}
-		CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
-		String applicationName = clConfig.applicationName;
-		String[] jobFiles = compatLayer.getJobs(clConfig);
-		String[] userLibraries = null;
-		if (clConfig.userLibs != null) {
-			userLibraries = clConfig.userLibs.split(",");
-		}
-		try {
-			HyracksRunningJob hyraxRunningJob = null;
-			if (userLibraries != null) {
-				Set<String> userLibs = new HashSet<String>();
-				for (String userLib : userLibraries) {
-					userLibs.add(userLib);
-				}
-				hyraxRunningJob = compatLayer.submitJobs(applicationName,
-						jobFiles, userLibs);
-			} else {
-				hyraxRunningJob = compatLayer.submitJobs(applicationName,
-						jobFiles);
-			}
-			compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
-			long end_time = System.nanoTime();
-			System.out.println("TOTAL TIME (from Launch to Completion):"
-					+ ((end_time - startTime) / (float) 1000000000.0)
-					+ " seconds.");
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw e;
-		}
-	}
+    private String[] getJobs(CompatibilityConfig clConfig) {
+        return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
+    }
+
+    public static void main(String args[]) throws Exception {
+        long startTime = System.nanoTime();
+        CompatibilityConfig clConfig = new CompatibilityConfig();
+        CmdLineParser cp = new CmdLineParser(clConfig);
+        try {
+            cp.parseArgument(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            cp.printUsage(System.err);
+            return;
+        }
+        CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+        String applicationName = clConfig.applicationName;
+        String[] jobFiles = compatLayer.getJobs(clConfig);
+        String[] userLibraries = null;
+        if (clConfig.userLibs != null) {
+            userLibraries = clConfig.userLibs.split(",");
+        }
+        try {
+            HyracksRunningJob hyraxRunningJob = null;
+            if (userLibraries != null) {
+                Set<String> userLibs = new HashSet<String>();
+                for (String userLib : userLibraries) {
+                    userLibs.add(userLib);
+                }
+                hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles, userLibs);
+            } else {
+                hyraxRunningJob = compatLayer.submitJobs(applicationName, jobFiles);
+            }
+            compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+            long end_time = System.nanoTime();
+            System.out.println("TOTAL TIME (from Launch to Completion):"
+                    + ((end_time - startTime) / (float) 1000000000.0) + " seconds.");
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
 }
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index f2f7d03..28833d7 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -40,7 +40,7 @@
 import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
@@ -370,13 +370,13 @@
 		return externalSortOp;
 	}
 
-	public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
+	public static MToNPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
 			JobConf conf, JobSpecification spec) {
 
 		Class mapOutputKeyClass = conf.getMapOutputKeyClass();
 		Class mapOutputValueClass = conf.getMapOutputValueClass();
 
-		MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
+		MToNPartitioningConnectorDescriptor connectorDescriptor = null;
 		ITuplePartitionComputerFactory factory = null;
 		conf.getMapOutputKeyClass();
 		if (conf.getPartitionerClass() != null
@@ -398,7 +398,7 @@
 			factory = new HadoopHashTuplePartitionComputerFactory(
 					mapOutputKeySerializerDerserializer);
 		}
-		connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec,
+		connectorDescriptor = new MToNPartitioningConnectorDescriptor(spec,
 				factory);
 		return connectorDescriptor;
 	}