Merged -r 438:524 from trunk into branch

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@525 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-hadoop-compat/pom.xml b/hyracks-hadoop-compat/pom.xml
index 26b7bd6..44eb842 100644
--- a/hyracks-hadoop-compat/pom.xml
+++ b/hyracks-hadoop-compat/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-hadoop-compat</artifactId>
-  <version>0.1.5</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.5</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -79,7 +79,7 @@
     <dependency>
     	<groupId>edu.uci.ics.hyracks</groupId>
     	<artifactId>hyracks-dataflow-hadoop</artifactId>
-    	<version>0.1.5</version>
+    	<version>0.1.7-SNAPSHOT</version>
     	<type>jar</type>
     	<scope>compile</scope>
     </dependency>
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index e4daf0b..a363221 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -1,93 +1,91 @@
 package edu.uci.ics.hyracks.hadoop.compat.client;
 
-import java.io.File;
-import java.util.List;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
-import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
-import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
-import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
 import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
 
 public class HyracksClient {
 
-    private HadoopAdapter hadoopAdapter;
-    private static HyracksRMIConnection connection;
-    private static final String applicationName = "CompatibilityLayer";
+	private static HyracksRMIConnection connection;
+	private static final String jobProfilingKey = "jobProfilingKey";
+	Set<String> systemLibs;
 
-    public HyracksClient(String clusterConf) throws Exception {
-        Properties properties = Utilities.getProperties(clusterConf, '=');
-        String clusterController = (String) properties.get(ConfigurationConstants.clusterControllerHost);
-        String fileSystem = (String) properties.get(ConfigurationConstants.namenodeURL);
-        initialize(clusterController, fileSystem);
-    }
+	public HyracksClient(Properties clusterProperties) throws Exception {
+		initialize(clusterProperties);
+	}
 
-    public HyracksClient(String clusterControllerAddr, String fileSystem) throws Exception {
-        initialize(clusterControllerAddr, fileSystem);
-    }
+	private void initialize(Properties properties) throws Exception {
+		String clusterController = (String) properties
+				.get(ConfigurationConstants.clusterControllerHost);
+		connection = new HyracksRMIConnection(clusterController, 1099);
+		systemLibs = new HashSet<String>();
+		for (String systemLib : ConfigurationConstants.systemLibs) {
+			String systemLibPath = properties.getProperty(systemLib);
+			if (systemLibPath != null) {
+				systemLibs.add(systemLibPath);
+			}
+		}
+	}
 
-    private void initialize(String clusterControllerAddr, String namenodeUrl) throws Exception {
-        connection = new HyracksRMIConnection(clusterControllerAddr, 1099);
-        connection.destroyApplication(applicationName);
-        hadoopAdapter = new HadoopAdapter(namenodeUrl);
-    }
+	public HyracksClient(String clusterConf, char delimiter) throws Exception {
+		Properties properties = Utilities.getProperties(clusterConf, delimiter);
+		initialize(properties);
+	}
 
-    public HyracksRunningJob submitJobs(List<JobConf> confs, Set<String> requiredLibs) throws Exception {
-        JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
-        String appName  = getApplicationNameHadoopJob(confs.get(0));
-        return submitJob(appName,spec, requiredLibs);
-    }
+	private Set<String> getRequiredLibs(Set<String> userLibs) {
+		Set<String> requiredLibs = new HashSet<String>();
+		for (String systemLib : systemLibs) {
+			requiredLibs.add(systemLib);
+		}
+		for (String userLib : userLibs) {
+			requiredLibs.add(userLib);
+		}
+		return requiredLibs;
+	}
 
-    private String getApplicationNameHadoopJob(JobConf jobConf) {
-        String jar = jobConf.getJar();
-        if( jar != null){
-            return jar.substring(jar.lastIndexOf("/") >=0 ? jar.lastIndexOf("/") +1 : 0);
-        }else {
-            return "" + System.currentTimeMillis();
-        }
-    }
-    
-    public HyracksRunningJob submitJob(JobConf conf, Set<String> requiredLibs) throws Exception {
-        JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
-        String appName  = getApplicationNameHadoopJob(conf);
-        return submitJob(appName, spec, requiredLibs);
-    }
+	public JobStatus getJobStatus(UUID jobId) throws Exception {
+		return connection.getJobStatus(jobId);
+	}
 
-    public JobStatus getJobStatus(UUID jobId) throws Exception {
-        return connection.getJobStatus(jobId);
-    }
+	private void createApplication(String applicationName, Set<String> userLibs)
+			throws Exception {
+		connection.createApplication(applicationName, Utilities
+				.getHyracksArchive(applicationName, getRequiredLibs(userLibs)));
+	}
 
-    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> requiredLibs) throws Exception {
-        UUID jobId = null;
-        try {
-            jobId = connection.createJob(applicationName, spec);
-        } catch (Exception e){
-            System.out.println(" application not found, creating application" + applicationName);
-            connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
-            jobId = connection.createJob(applicationName, spec);
-        }
-        connection.start(jobId);
-        HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
-        return runningJob;
-    }
+	public HyracksRunningJob submitJob(String applicationName,
+			JobSpecification spec) throws Exception {
+		String jobProfilingVal = System.getenv(jobProfilingKey);
+		boolean doProfiling = ("true".equalsIgnoreCase(jobProfilingVal));
+		UUID jobId;
+		if (doProfiling) {
+			System.out.println("PROFILING");
+			jobId = connection.createJob(applicationName, spec, EnumSet
+					.of(JobFlag.PROFILE_RUNTIME));
+		} else {
+			jobId = connection.createJob(applicationName, spec);
+		}
+		connection.start(jobId);
+		HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
+		return runningJob;
+	}
 
-    public HadoopAdapter getHadoopAdapter() {
-        return hadoopAdapter;
-    }
+	public HyracksRunningJob submitJob(String applicationName,
+			JobSpecification spec, Set<String> userLibs) throws Exception {
+		createApplication(applicationName, userLibs);
+		return submitJob(applicationName, spec);
+	}
 
-    public void setHadoopAdapter(HadoopAdapter hadoopAdapter) {
-        this.hadoopAdapter = hadoopAdapter;
-    }
-
-    public void waitForCompleton(UUID jobId) throws Exception {
-        connection.waitForCompletion(jobId);
-    }
-
+	public void waitForCompleton(UUID jobId) throws Exception {
+		connection.waitForCompletion(jobId);
+	}
 }
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index 0b96041..37f4d34 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.hyracks.hadoop.compat.driver;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,175 +26,183 @@
 
 public class CompatibilityLayer {
 
-    HyracksClient hyracksClient;
-    DCacheHandler dCacheHander = null;
-    Properties clusterConf;
-    Set<String> systemLibs;
+	HyracksClient hyracksClient;
+	DCacheHandler dCacheHander = null;
+	Properties clusterConf;
+	HadoopAdapter hadoopAdapter;
 
-    private static char configurationFileDelimiter = '=';
-    private static final String dacheKeyPrefix = "dcache.key";
+	private static char configurationFileDelimiter = '=';
+	private static final String dacheKeyPrefix = "dcache.key";
 
-    public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
-        initialize(clConfig);
-    }
+	public CompatibilityLayer(CompatibilityConfig clConfig) throws Exception {
+		initialize(clConfig);
+	}
 
-    public HyracksRunningJob submitJobs(String[] jobFiles, Set<String> userLibs) throws Exception {
-        Set<String> requiredLibs = getRequiredLibs(userLibs);
-        List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
-        Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFiles[0]);
-        String tempDir = "/tmp";
-        if (dcacheTasks.size() > 0) {
-            HadoopAdapter hadoopAdapter = hyracksClient.getHadoopAdapter();
-            for (String key : dcacheTasks.keySet()) {
-                String destPath = tempDir + "/" + key + System.currentTimeMillis();
-                hadoopAdapter.getHDFSClient().copyToLocalFile(new Path(dcacheTasks.get(key)), new Path(destPath));
-                System.out.println(" source :" + dcacheTasks.get(key));
-                System.out.println(" dest :" + destPath);
-                System.out.println(" key :" + key);
-                System.out.println(" value :" + destPath);
-                dCacheHander.put(key, destPath);
-            }
-        }
-        HyracksRunningJob hyraxRunningJob = hyracksClient.submitJobs(jobConfs, requiredLibs);
-        return hyraxRunningJob;
-    }
+	private void initialize(CompatibilityConfig clConfig) throws Exception {
+		clusterConf = Utilities.getProperties(clConfig.clusterConf,
+				configurationFileDelimiter);
+		hadoopAdapter = new HadoopAdapter(clusterConf
+				.getProperty(ConfigurationConstants.namenodeURL));
+		hyracksClient = new HyracksClient(clusterConf);
+		dCacheHander = new DCacheHandler(clusterConf
+				.getProperty(ConfigurationConstants.dcacheServerConfiguration));
+	}
 
-    private Set<String> getRequiredLibs(Set<String> userLibs) {
-        Set<String> requiredLibs = new HashSet<String>();
-        for (String systemLib : systemLibs) {
-            requiredLibs.add(systemLib);
-        }
-        for (String userLib : userLibs) {
-            requiredLibs.add(userLib);
-        }
-        return requiredLibs;
-    }
+	public HyracksRunningJob submitJob(JobConf conf,Set<String> userLibs) throws Exception {
+		List<JobConf> jobConfs = new ArrayList<JobConf>();
+		jobConfs.add(conf);
+		String applicationName = conf.getJobName() + System.currentTimeMillis();
+		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+				applicationName, spec, userLibs);
+		return hyracksRunningJob; 
+	}
+	
+	public HyracksRunningJob submitJobs(String applicationName,
+			String[] jobFiles, Set<String> userLibs) throws Exception {
+		List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+		populateDCache(jobFiles[0]);
+		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+				applicationName, spec, userLibs);
+		return hyracksRunningJob;
+	}
 
-    private void initialize(CompatibilityConfig clConfig) throws Exception {
-        clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
-        systemLibs = new HashSet<String>();
-        for (String systemLib : ConfigurationConstants.systemLibs) {
-            String systemLibPath = clusterConf.getProperty(systemLib);
-            if (systemLibPath != null) {
-                systemLibs.add(systemLibPath);
-            }
-        }
-        String clusterControllerHost = clusterConf.getProperty(ConfigurationConstants.clusterControllerHost);
-        String dacheServerConfiguration = clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration);
-        String fileSystem = clusterConf.getProperty(ConfigurationConstants.namenodeURL);
-        hyracksClient = new HyracksClient(clusterControllerHost, fileSystem);
-        try {
-            dCacheHander = new DCacheHandler(dacheServerConfiguration);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+	public HyracksRunningJob submitJobs(String applicationName,
+			String[] jobFiles) throws Exception {
+		List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
+		populateDCache(jobFiles[0]);
+		JobSpecification spec = hadoopAdapter.getJobSpecification(jobConfs);
+		HyracksRunningJob hyracksRunningJob = hyracksClient.submitJob(
+				applicationName, spec);
+		return hyracksRunningJob;
+	}
 
-    private Map<String, String> initializeCustomProperties(Properties properties, String prefix) {
-        Map<String, String> foundProperties = new HashMap<String, String>();
-        Set<Entry<Object, Object>> entrySet = properties.entrySet();
-        for (Entry entry : entrySet) {
-            String key = (String) entry.getKey();
-            String value = (String) entry.getValue();
-            if ((key.startsWith(prefix))) {
-                String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
-                foundProperties.put(actualKey, value);
-            }
-        }
-        return foundProperties;
-    }
+	private void populateDCache(String jobFile) throws IOException {
+		Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFile);
+		String tempDir = "/tmp";
+		if (dcacheTasks.size() > 0) {
+			for (String key : dcacheTasks.keySet()) {
+				String destPath = tempDir + "/" + key
+						+ System.currentTimeMillis();
+				hadoopAdapter.getHDFSClient().copyToLocalFile(
+						new Path(dcacheTasks.get(key)), new Path(destPath));
+				System.out.println(" source :" + dcacheTasks.get(key));
+				System.out.println(" dest :" + destPath);
+				System.out.println(" key :" + key);
+				System.out.println(" value :" + destPath);
+				dCacheHander.put(key, destPath);
+			}
+		}
+	}
 
-    public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
-        Properties jobProperties = Utilities.getProperties(jobFile, ',');
-        Map<String, String> dcacheTasks = new HashMap<String, String>();
-        Map<String, String> dcacheKeys = initializeCustomProperties(jobProperties, dacheKeyPrefix);
-        for (String key : dcacheKeys.keySet()) {
-            String sourcePath = dcacheKeys.get(key);
-            if (sourcePath != null) {
-                dcacheTasks.put(key, sourcePath);
-            }
-        }
-        return dcacheTasks;
-    }
+	private String getApplicationNameForHadoopJob(JobConf jobConf) {
+		String jar = jobConf.getJar();
+		if (jar != null) {
+			return jar.substring(jar.lastIndexOf("/") >= 0 ? jar
+					.lastIndexOf("/") + 1 : 0);
+		} else {
+			return "" + System.currentTimeMillis();
+		}
+	}
 
-    public void waitForCompletion(UUID jobId) throws Exception {
-        hyracksClient.waitForCompleton(jobId);
-    }
+	private Map<String, String> initializeCustomProperties(
+			Properties properties, String prefix) {
+		Map<String, String> foundProperties = new HashMap<String, String>();
+		Set<Entry<Object, Object>> entrySet = properties.entrySet();
+		for (Entry entry : entrySet) {
+			String key = (String) entry.getKey();
+			String value = (String) entry.getValue();
+			if ((key.startsWith(prefix))) {
+				String actualKey = key.substring(prefix.length() + 1); // "cut off '<prefix>.' from the beginning"
+				foundProperties.put(actualKey, value);
+			}
+		}
+		return foundProperties;
+	}
 
-    public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, Set<String> userLibs) {
-        HyracksRunningJob hyraxRunningJob = null;
-        List<JobConf> jobConfs = new ArrayList<JobConf>();
-        jobConfs.add(jobConf);
-        try {
-            hyraxRunningJob = hyracksClient.submitJobs(jobConfs, getRequiredLibs(userLibs));
-            System.out.println(" Result in " + jobConf.get("mapred.output.dir"));
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return hyraxRunningJob;
-    }
+	public Map<String, String> preparePreLaunchDCacheTasks(String jobFile) {
+		Properties jobProperties = Utilities.getProperties(jobFile, ',');
+		Map<String, String> dcacheTasks = new HashMap<String, String>();
+		Map<String, String> dcacheKeys = initializeCustomProperties(
+				jobProperties, dacheKeyPrefix);
+		for (String key : dcacheKeys.keySet()) {
+			String sourcePath = dcacheKeys.get(key);
+			if (sourcePath != null) {
+				dcacheTasks.put(key, sourcePath);
+			}
+		}
+		return dcacheTasks;
+	}
 
-    public HyracksRunningJob submitJob(String appName, JobSpecification jobSpec, Set<String> userLibs) {
-        HyracksRunningJob hyraxRunningJob = null;
-        try {
-            hyraxRunningJob = hyracksClient.submitJob(appName, jobSpec, getRequiredLibs(userLibs));
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return hyraxRunningJob;
-    }
+	public void waitForCompletion(UUID jobId) throws Exception {
+		hyracksClient.waitForCompleton(jobId);
+	}
 
-    private List<JobConf> constructHadoopJobConfs(String[] jobFiles) throws Exception {
-        List<JobConf> jobConfs = new ArrayList<JobConf>();
-        for (String jobFile : jobFiles) {
-            jobConfs.add(constructHadoopJobConf(jobFile));
-        }
-        return jobConfs;
-    }
+	private List<JobConf> constructHadoopJobConfs(String[] jobFiles)
+			throws Exception {
+		List<JobConf> jobConfs = new ArrayList<JobConf>();
+		for (String jobFile : jobFiles) {
+			jobConfs.add(constructHadoopJobConf(jobFile));
+		}
+		return jobConfs;
+	}
 
-    private JobConf constructHadoopJobConf(String jobFile) {
-        Properties jobProperties = Utilities.getProperties(jobFile, '=');
-        JobConf conf = hyracksClient.getHadoopAdapter().getConf();
-        for (Entry entry : jobProperties.entrySet()) {
-            conf.set((String) entry.getKey(), (String) entry.getValue());
-            System.out.println((String) entry.getKey() + " : " + (String) entry.getValue());
-        }
-        return conf;
-    }
+	private JobConf constructHadoopJobConf(String jobFile) {
+		Properties jobProperties = Utilities.getProperties(jobFile, '=');
+		JobConf conf = new JobConf(hadoopAdapter.getConf());
+		for (Entry entry : jobProperties.entrySet()) {
+			conf.set((String) entry.getKey(), (String) entry.getValue());
+			System.out.println((String) entry.getKey() + " : "
+					+ (String) entry.getValue());
+		}
+		return conf;
+	}
 
-    private String[] getJobs(CompatibilityConfig clConfig) {
-        return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles.split(",");
-    }
+	private String[] getJobs(CompatibilityConfig clConfig) {
+		return clConfig.jobFiles == null ? new String[0] : clConfig.jobFiles
+				.split(",");
+	}
 
-    public static void main(String args[]) throws Exception {
-        long startTime = System.nanoTime();
-        CompatibilityConfig clConfig = new CompatibilityConfig();
-        CmdLineParser cp = new CmdLineParser(clConfig);
-        try {
-            cp.parseArgument(args);
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            cp.printUsage(System.err);
-            return;
-        }
-        CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
-        String[] jobFiles = compatLayer.getJobs(clConfig);
-        String[] tempUserLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
-        Set<String> userLibs = new HashSet<String>();
-        for(String userLib : tempUserLibs) {
-            userLibs.add(userLib);
-        }
-        HyracksRunningJob hyraxRunningJob = null;
-        try {
-            hyraxRunningJob = compatLayer.submitJobs(jobFiles, userLibs);
-            compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        hyraxRunningJob.waitForCompletion();
-        long end_time = System.nanoTime();
-        System.out.println("TOTAL TIME (from Launch to Completion):" + ((end_time - startTime) / (float) 1000000000.0)
-                + " seconds.");
-    }
-
+	public static void main(String args[]) throws Exception {
+		long startTime = System.nanoTime();
+		CompatibilityConfig clConfig = new CompatibilityConfig();
+		CmdLineParser cp = new CmdLineParser(clConfig);
+		try {
+			cp.parseArgument(args);
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			cp.printUsage(System.err);
+			return;
+		}
+		CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
+		String applicationName = clConfig.applicationName;
+		String[] jobFiles = compatLayer.getJobs(clConfig);
+		String[] userLibraries = null;
+		if (clConfig.userLibs != null) {
+			userLibraries = clConfig.userLibs.split(",");
+		}
+		try {
+			HyracksRunningJob hyraxRunningJob = null;
+			if (userLibraries != null) {
+				Set<String> userLibs = new HashSet<String>();
+				for (String userLib : userLibraries) {
+					userLibs.add(userLib);
+				}
+				hyraxRunningJob = compatLayer.submitJobs(applicationName,
+						jobFiles, userLibs);
+			} else {
+				hyraxRunningJob = compatLayer.submitJobs(applicationName,
+						jobFiles);
+			}
+			compatLayer.waitForCompletion(hyraxRunningJob.getJobId());
+			long end_time = System.nanoTime();
+			System.out.println("TOTAL TIME (from Launch to Completion):"
+					+ ((end_time - startTime) / (float) 1000000000.0)
+					+ " seconds.");
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
 }
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
index 1dd266f..6d94bc7 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -4,17 +4,20 @@
 
 public class CompatibilityConfig {
 
-    @Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
-            + " (1) Address of HyracksClusterController service" + " (2) Address of Hadoop namenode service")
-    public String clusterConf;
+	@Option(name = "-cluster", required = true, usage = "Defines the path to the configuration file that provides the following info: +"
+			+ " (1) Address of HyracksClusterController service"
+			+ " (2) Address of Hadoop namenode service")
+	public String clusterConf;
 
-    @Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
-            + "Each job file defines the hadoop job + " + "The order in the list defines the sequence in which"
-            + "the jobs are to be executed")
-    public String jobFiles;
+	@Option(name = "-jobFiles", usage = "Comma separated list of jobFiles. "
+			+ "Each job file defines the hadoop job + "
+			+ "The order in the list defines the sequence in which"
+			+ "the jobs are to be executed")
+	public String jobFiles;
 
-    @Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running "
-            + " mappers/reducers etc ")
-    public String userLibs;
+	@Option(name = "-applicationName", usage = " The application as part of which the job executes")
+	public String applicationName;
 
+	@Option(name = "-userLibs", usage = " A comma separated list of jar files that are required to be addedd to classpath when running ")
+	public String userLibs;
 }
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index d0df7f1..f2f7d03 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -47,310 +47,360 @@
 
 public class HadoopAdapter {
 
-    public static final String FS_DEFAULT_NAME = "fs.default.name";
-    private JobConf jobConf;
-    private Map<OperatorDescriptorId,Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId,Integer>();
-    public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT"; 
-    public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
-    public static final int DEFAULT_MAX_MAPPERS = 40;
-    public static final int DEFAULT_MAX_REDUCERS= 40;
-    public static final String MAX_MAPPERS_KEY = "maxMappers";
-    public static final String MAX_REDUCERS_KEY = "maxReducers";
-    public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
-    
-    private  int maxMappers = DEFAULT_MAX_MAPPERS;
-    private  int maxReducers = DEFAULT_MAX_REDUCERS;
-    private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
-    
-    class NewHadoopConstants {
-    	public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
-    	public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
-    	public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
-    	public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
-    	public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
-    	public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
-    }
-    
-    public HadoopAdapter(String namenodeUrl) {
-        jobConf = new JobConf(true);
-        jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
-        if(System.getenv(MAX_MAPPERS_KEY) != null) {
-            maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
-        }
-        if(System.getenv(MAX_REDUCERS_KEY) != null) {
-            maxReducers= Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
-        }
-        if(System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
-            exSortFrame= Integer.parseInt(System.getenv(EX_SORT_FRAME_LIMIT_KEY));
-        }
-    }
+	public static final String FS_DEFAULT_NAME = "fs.default.name";
+	private JobConf jobConf;
+	private Map<OperatorDescriptorId, Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId, Integer>();
+	public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
+	public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
+	public static final int DEFAULT_MAX_MAPPERS = 40;
+	public static final int DEFAULT_MAX_REDUCERS = 40;
+	public static final String MAX_MAPPERS_KEY = "maxMappers";
+	public static final String MAX_REDUCERS_KEY = "maxReducers";
+	public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
 
-    private String getEnvironmentVariable(String key, String def) {
-        String ret =  System.getenv(key);
-        return ret != null ? ret : def;
-    }
-    
-    public JobConf getConf() {
-        return jobConf;
-    }
+	private int maxMappers = DEFAULT_MAX_MAPPERS;
+	private int maxReducers = DEFAULT_MAX_REDUCERS;
+	private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
 
-    public static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf)
-            throws IOException {
-        VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress,
-                jobConf);
-        return versionedProtocol;
-    }
+	class NewHadoopConstants {
+		public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
+		public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
+		public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
+		public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
+		public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
+		public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
+	}
 
-    private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
-        RecordDescriptor recordDescriptor = null;
-        try {
-            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
-                    .forName(className1), (Class<? extends Writable>) Class.forName(className2));
-        } catch (ClassNotFoundException cnfe) {
-            cnfe.printStackTrace();
-        }
-        return recordDescriptor;
-    }
+	public HadoopAdapter(String namenodeUrl) {
+		jobConf = new JobConf(true);
+		jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+		if (System.getenv(MAX_MAPPERS_KEY) != null) {
+			maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
+		}
+		if (System.getenv(MAX_REDUCERS_KEY) != null) {
+			maxReducers = Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
+		}
+		if (System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
+			exSortFrame = Integer.parseInt(System
+					.getenv(EX_SORT_FRAME_LIMIT_KEY));
+		}
+	}
 
-    private Object[] getInputSplits(JobConf conf) throws IOException, ClassNotFoundException, InterruptedException {
-        if (conf.getUseNewMapper()) {
-        	return getNewInputSplits(conf);
-        } else {
-        	return getOldInputSplits(conf);
-        }
-    }
-    
-    private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(JobConf conf) throws ClassNotFoundException, IOException, InterruptedException {
-    	org.apache.hadoop.mapreduce.InputSplit[] splits = null;
-    	JobContext context = new JobContext(conf,null);
-    	org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils.newInstance(context.getInputFormatClass(),conf);
-    	List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat.getSplits(context);
-    	return inputSplits.toArray(new org.apache.hadoop.mapreduce.InputSplit[]{});
-    }
-    
-    private InputSplit[] getOldInputSplits(JobConf conf) throws IOException  {
-      	InputFormat inputFormat = conf.getInputFormat();
-    	return inputFormat.getSplits(conf, conf.getNumMapTasks());
-    }
-   
-    private void configurePartitionCountConstraint(JobSpecification spec, IOperatorDescriptor operator,int instanceCount){
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, instanceCount);
-        operatorInstanceCount.put(operator.getOperatorId(),instanceCount);
-    }
+	private String getEnvironmentVariable(String key, String def) {
+		String ret = System.getenv(key);
+		return ret != null ? ret : def;
+	}
 
-    public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
-            throws Exception {
-        boolean selfRead = previousOp == null;
-        IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
-        HadoopMapperOperatorDescriptor mapOp = null;
-        if(selfRead) {
-            Object [] splits = getInputSplits(conf,maxMappers);
-            mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
-	    configurePartitionCountConstraint(spec,mapOp,splits.length);
-            System.out.println("No of  mappers :" + splits.length);
-        } else {
-	    configurePartitionCountConstraint(spec,mapOp,getInstanceCount(previousOp));
-            mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
-            spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0, mapOp, 0);
-        }
-        return mapOp;
-    }
+	public JobConf getConf() {
+		return jobConf;
+	}
 
-    public HadoopReducerOperatorDescriptor getReducer(JobConf conf, JobSpecification spec) {
-        HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(spec, conf, null,
-                new ClasspathBasedHadoopClassFactory());
-        return reduceOp;
-    }
+	public static VersionedProtocol getProtocol(Class protocolClass,
+			InetSocketAddress inetAddress, JobConf jobConf) throws IOException {
+		VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass,
+				ClientProtocol.versionID, inetAddress, jobConf);
+		return versionedProtocol;
+	}
 
-    public FileSystem getHDFSClient() {
-        FileSystem fileSystem = null;
-        try {
-            fileSystem = FileSystem.get(jobConf);
-        } catch (IOException ioe) {
-            ioe.printStackTrace();
-        }
-        return fileSystem;
-    }
+	private static RecordDescriptor getHadoopRecordDescriptor(
+			String className1, String className2) {
+		RecordDescriptor recordDescriptor = null;
+		try {
+			recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+					(Class<? extends Writable>) Class.forName(className1),
+					(Class<? extends Writable>) Class.forName(className2));
+		} catch (ClassNotFoundException cnfe) {
+			cnfe.printStackTrace();
+		}
+		return recordDescriptor;
+	}
 
-    public JobSpecification getJobSpecification(List<JobConf> jobConfs) throws Exception {
-        JobSpecification spec = null;
-        if (jobConfs.size() == 1) {
-            spec = getJobSpecification(jobConfs.get(0));
-        } else {
-            spec = getPipelinedSpec(jobConfs);
-        }
-        return spec;
-    }
+	private Object[] getInputSplits(JobConf conf) throws IOException,
+			ClassNotFoundException, InterruptedException {
+		if (conf.getUseNewMapper()) {
+			return getNewInputSplits(conf);
+		} else {
+			return getOldInputSplits(conf);
+		}
+	}
 
-    private IOperatorDescriptor configureOutput( IOperatorDescriptor previousOperator, JobConf conf,
-            JobSpecification spec) throws Exception {
-	int instanceCountPreviousOperator = operatorInstanceCount.get(previousOperator.getOperatorId());
-        int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : instanceCountPreviousOperator;
-        HadoopWriteOperatorDescriptor writer = null;
-        writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
-	configurePartitionCountConstraint(spec,writer,numOutputters);
-        spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
-        return writer;
-    }
+	private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(
+			JobConf conf) throws ClassNotFoundException, IOException,
+			InterruptedException {
+		org.apache.hadoop.mapreduce.InputSplit[] splits = null;
+		JobContext context = new JobContext(conf, null);
+		org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils
+				.newInstance(context.getInputFormatClass(), conf);
+		List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat
+				.getSplits(context);
+		return inputSplits
+				.toArray(new org.apache.hadoop.mapreduce.InputSplit[] {});
+	}
 
+	private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
+		InputFormat inputFormat = conf.getInputFormat();
+		return inputFormat.getSplits(conf, conf.getNumMapTasks());
+	}
 
-    private int getInstanceCount(IOperatorDescriptor operator) {
-        return operatorInstanceCount.get(operator.getOperatorId());
-    } 
+	private void configurePartitionCountConstraint(JobSpecification spec,
+			IOperatorDescriptor operator, int instanceCount) {
+		PartitionConstraintHelper.addPartitionCountConstraint(spec, operator,
+				instanceCount);
+		operatorInstanceCount.put(operator.getOperatorId(), instanceCount);
+	}
 
-    private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
-            JobSpecification spec) throws Exception {
-        boolean useCombiner = (jobConf.getCombinerClass() != null);
-        IOperatorDescriptor mapSideOutputOp = previousOperator;
-        if (useCombiner) {
-            System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
-            IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
-	    configurePartitionCountConstraint(spec,mapSideCombineSortOp,getInstanceCount(previousOperator));
-    
-            HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
-	    configurePartitionCountConstraint(spec,mapSideCombineReduceOp,getInstanceCount(previousOperator));
-            spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
-            mapSideOutputOp = mapSideCombineSortOp;
-        }
-        return mapSideOutputOp;
-    }
-    
-    private int getNumReduceTasks(JobConf jobConf) {
-        int numReduceTasks = Math.min(maxReducers,jobConf.getNumReduceTasks());
-        return numReduceTasks;
-    }
-    
-    private IOperatorDescriptor addReducer(IOperatorDescriptor previousOperator, JobConf jobConf,
-            JobSpecification spec) throws Exception {
-        IOperatorDescriptor mrOutputOperator = previousOperator;
-        if (jobConf.getNumReduceTasks() != 0) {
-            IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
-            HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
-            int numReduceTasks = getNumReduceTasks(jobConf);
-            System.out.println("No of Reducers :" + numReduceTasks);
-	    configurePartitionCountConstraint(spec,sorter,numReduceTasks);
-	    configurePartitionCountConstraint(spec,reducer,numReduceTasks);
-    
-            IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
-            spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);
-            spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
-            mrOutputOperator = reducer;
-        }   
-        return mrOutputOperator;
-    }
-    
-    private long getInputSize(Object[] splits,JobConf conf) throws IOException, InterruptedException {
-        long totalInputSize =0;
-    	if(conf.getUseNewMapper()) {
-        	for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[])splits) {
-        	    totalInputSize += split.getLength();
-            }                                       
-        } else {
-	    	for (InputSplit split : (InputSplit[])splits) {
-	            totalInputSize += split.getLength();
-	        }
-        }
-    	return totalInputSize;
-    }
-    
-    private Object[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
-        Object[] splits = getInputSplits(conf);
-        System.out.println(" initial split count :" + splits.length);
-        System.out.println(" desired mappers :" + desiredMaxMappers);
-        if (splits.length > desiredMaxMappers) {
-            long totalInputSize = getInputSize(splits,conf);
-            long goalSize = (totalInputSize/desiredMaxMappers);
-            System.out.println(" total input length :" + totalInputSize);
-            System.out.println(" goal size :" + goalSize);
-            conf.setLong("mapred.min.split.size", goalSize);
-            conf.setNumMapTasks(desiredMaxMappers);
-            splits = getInputSplits(conf);
-            System.out.println(" revised split count :" + splits.length);
-        }
-        return splits; 
-    }
-    
-    public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
-        JobSpecification spec = new JobSpecification();
-        Iterator<JobConf> iterator = jobConfs.iterator();
-        JobConf firstMR = iterator.next();
-        IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec,firstMR);
-        while (iterator.hasNext())
-            for (JobConf currentJobConf : jobConfs) {
-                mrOutputOp = configureMapReduce(mrOutputOp, spec , currentJobConf);
-            }
-        configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
-        return spec;
-    }
+	public HadoopMapperOperatorDescriptor getMapper(JobConf conf,
+			JobSpecification spec, IOperatorDescriptor previousOp)
+			throws Exception {
+		boolean selfRead = previousOp == null;
+		IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
+		HadoopMapperOperatorDescriptor mapOp = null;
+		if (selfRead) {
+			Object[] splits = getInputSplits(conf, maxMappers);
+			mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
+					classFactory);
+			configurePartitionCountConstraint(spec, mapOp, splits.length);
+		} else {
+			configurePartitionCountConstraint(spec, mapOp,
+					getInstanceCount(previousOp));
+			mapOp = new HadoopMapperOperatorDescriptor(spec, conf, classFactory);
+			spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0,
+					mapOp, 0);
+		}
+		return mapOp;
+	}
 
-    public JobSpecification getJobSpecification(JobConf conf) throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor mrOutput = configureMapReduce(null,spec, conf);
-        IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
-        spec.addRoot(printer);
-        System.out.println(spec);
-        return spec;
-    }
-    
-    private IOperatorDescriptor configureMapReduce(IOperatorDescriptor previousOuputOp, JobSpecification spec, JobConf conf) throws Exception {
-        IOperatorDescriptor mapper = getMapper(conf,spec,previousOuputOp);
-        IOperatorDescriptor mapSideOutputOp = addCombiner(mapper,conf,spec);
-        IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
-        return reducer; 
-    }
+	public HadoopReducerOperatorDescriptor getReducer(JobConf conf,
+			JobSpecification spec, boolean useAsCombiner) {
+		HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(
+				spec, conf, null, new ClasspathBasedHadoopClassFactory(),
+				useAsCombiner);
+		return reduceOp;
+	}
 
-    public static InMemorySortOperatorDescriptor getInMemorySorter(JobConf conf, JobSpecification spec) {
-        InMemorySortOperatorDescriptor inMemorySortOp = null;
-        RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
-                .getMapOutputValueClass().getName());
-        Class<? extends RawComparator> rawComparatorClass = null;
-        WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
-                WritableComparable.class));
-        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
-                writableComparator.getClass());
-        inMemorySortOp = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
-        return inMemorySortOp;
-    }
+	public FileSystem getHDFSClient() {
+		FileSystem fileSystem = null;
+		try {
+			fileSystem = FileSystem.get(jobConf);
+		} catch (IOException ioe) {
+			ioe.printStackTrace();
+		}
+		return fileSystem;
+	}
 
-    public static ExternalSortOperatorDescriptor getExternalSorter(JobConf conf, JobSpecification spec) {
-        ExternalSortOperatorDescriptor externalSortOp = null;
-        RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
-                .getMapOutputValueClass().getName());
-        Class<? extends RawComparator> rawComparatorClass = null;
-        WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
-                WritableComparable.class));
-        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
-                writableComparator.getClass());
-        externalSortOp = new ExternalSortOperatorDescriptor(spec,conf.getInt(HYRACKS_EX_SORT_FRAME_LIMIT,DEFAULT_EX_SORT_FRAME_LIMIT),new int[] { 0 },
-                new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
-        return externalSortOp;
-    }
-    
-    public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
-            JobSpecification spec) {
+	public JobSpecification getJobSpecification(List<JobConf> jobConfs)
+			throws Exception {
+		JobSpecification spec = null;
+		if (jobConfs.size() == 1) {
+			spec = getJobSpecification(jobConfs.get(0));
+		} else {
+			spec = getPipelinedSpec(jobConfs);
+		}
+		return spec;
+	}
 
-        Class mapOutputKeyClass = conf.getMapOutputKeyClass();
-        Class mapOutputValueClass = conf.getMapOutputValueClass();
+	private IOperatorDescriptor configureOutput(
+			IOperatorDescriptor previousOperator, JobConf conf,
+			JobSpecification spec) throws Exception {
+		int instanceCountPreviousOperator = operatorInstanceCount
+				.get(previousOperator.getOperatorId());
+		int numOutputters = conf.getNumReduceTasks() != 0 ? conf
+				.getNumReduceTasks() : instanceCountPreviousOperator;
+		HadoopWriteOperatorDescriptor writer = null;
+		writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
+		configurePartitionCountConstraint(spec, writer, numOutputters);
+		spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator,
+				0, writer, 0);
+		return writer;
+	}
 
-        MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
-        ITuplePartitionComputerFactory factory = null;
-        conf.getMapOutputKeyClass();
-        if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
-            Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
-            factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner, DatatypeHelper
-                    .createSerializerDeserializer(mapOutputKeyClass), DatatypeHelper
-                    .createSerializerDeserializer(mapOutputValueClass));
-        } else {
-            RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
-                    mapOutputValueClass);
-            ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
-                    .createSerializerDeserializer(mapOutputKeyClass);
-            factory = new HadoopHashTuplePartitionComputerFactory(mapOutputKeySerializerDerserializer);
-        }
-        connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec, factory);
-        return connectorDescriptor;
-    }
+	private int getInstanceCount(IOperatorDescriptor operator) {
+		return operatorInstanceCount.get(operator.getOperatorId());
+	}
+
+	private IOperatorDescriptor addCombiner(
+			IOperatorDescriptor previousOperator, JobConf jobConf,
+			JobSpecification spec) throws Exception {
+		boolean useCombiner = (jobConf.getCombinerClass() != null);
+		IOperatorDescriptor mapSideOutputOp = previousOperator;
+		if (useCombiner) {
+			System.out.println("Using Combiner:"
+					+ jobConf.getCombinerClass().getName());
+			IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(
+					jobConf, spec);
+			configurePartitionCountConstraint(spec, mapSideCombineSortOp,
+					getInstanceCount(previousOperator));
+
+			HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(
+					jobConf, spec, true);
+			configurePartitionCountConstraint(spec, mapSideCombineReduceOp,
+					getInstanceCount(previousOperator));
+			spec.connect(new OneToOneConnectorDescriptor(spec),
+					previousOperator, 0, mapSideCombineSortOp, 0);
+			spec.connect(new OneToOneConnectorDescriptor(spec),
+					mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
+			mapSideOutputOp = mapSideCombineReduceOp;
+		}
+		return mapSideOutputOp;
+	}
+
+	private int getNumReduceTasks(JobConf jobConf) {
+		int numReduceTasks = Math.min(maxReducers, jobConf.getNumReduceTasks());
+		return numReduceTasks;
+	}
+
+	private IOperatorDescriptor addReducer(
+			IOperatorDescriptor previousOperator, JobConf jobConf,
+			JobSpecification spec) throws Exception {
+		IOperatorDescriptor mrOutputOperator = previousOperator;
+		if (jobConf.getNumReduceTasks() != 0) {
+			IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
+			HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec,
+					false);
+			int numReduceTasks = getNumReduceTasks(jobConf);
+			configurePartitionCountConstraint(spec, sorter, numReduceTasks);
+			configurePartitionCountConstraint(spec, reducer, numReduceTasks);
+
+			IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(
+					jobConf, spec);
+			spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter,
+					0);
+			spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
+					reducer, 0);
+			mrOutputOperator = reducer;
+		}
+		return mrOutputOperator;
+	}
+
+	private long getInputSize(Object[] splits, JobConf conf)
+			throws IOException, InterruptedException {
+		long totalInputSize = 0;
+		if (conf.getUseNewMapper()) {
+			for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[]) splits) {
+				totalInputSize += split.getLength();
+			}
+		} else {
+			for (InputSplit split : (InputSplit[]) splits) {
+				totalInputSize += split.getLength();
+			}
+		}
+		return totalInputSize;
+	}
+
+	private Object[] getInputSplits(JobConf conf, int desiredMaxMappers)
+			throws Exception {
+		Object[] splits = getInputSplits(conf);
+		if (splits.length > desiredMaxMappers) {
+			long totalInputSize = getInputSize(splits, conf);
+			long goalSize = (totalInputSize / desiredMaxMappers);
+			conf.setLong("mapred.min.split.size", goalSize);
+			conf.setNumMapTasks(desiredMaxMappers);
+			splits = getInputSplits(conf);
+		}
+		return splits;
+	}
+
+	public JobSpecification getPipelinedSpec(List<JobConf> jobConfs)
+			throws Exception {
+		JobSpecification spec = new JobSpecification();
+		Iterator<JobConf> iterator = jobConfs.iterator();
+		JobConf firstMR = iterator.next();
+		IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec, firstMR);
+		while (iterator.hasNext())
+			for (JobConf currentJobConf : jobConfs) {
+				mrOutputOp = configureMapReduce(mrOutputOp, spec,
+						currentJobConf);
+			}
+		configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
+		return spec;
+	}
+
+	public JobSpecification getJobSpecification(JobConf conf) throws Exception {
+		JobSpecification spec = new JobSpecification();
+		IOperatorDescriptor mrOutput = configureMapReduce(null, spec, conf);
+		IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
+		spec.addRoot(printer);
+		System.out.println(spec);
+		return spec;
+	}
+
+	private IOperatorDescriptor configureMapReduce(
+			IOperatorDescriptor previousOuputOp, JobSpecification spec,
+			JobConf conf) throws Exception {
+		IOperatorDescriptor mapper = getMapper(conf, spec, previousOuputOp);
+		IOperatorDescriptor mapSideOutputOp = addCombiner(mapper, conf, spec);
+		IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
+		return reducer;
+	}
+
+	public static InMemorySortOperatorDescriptor getInMemorySorter(
+			JobConf conf, JobSpecification spec) {
+		InMemorySortOperatorDescriptor inMemorySortOp = null;
+		RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+				.getMapOutputKeyClass().getName(), conf
+				.getMapOutputValueClass().getName());
+		Class<? extends RawComparator> rawComparatorClass = null;
+		WritableComparator writableComparator = WritableComparator.get(conf
+				.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+		WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+				writableComparator.getClass());
+		inMemorySortOp = new InMemorySortOperatorDescriptor(spec,
+				new int[] { 0 },
+				new IBinaryComparatorFactory[] { comparatorFactory },
+				recordDescriptor);
+		return inMemorySortOp;
+	}
+
+	public static ExternalSortOperatorDescriptor getExternalSorter(
+			JobConf conf, JobSpecification spec) {
+		ExternalSortOperatorDescriptor externalSortOp = null;
+		RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
+				.getMapOutputKeyClass().getName(), conf
+				.getMapOutputValueClass().getName());
+		Class<? extends RawComparator> rawComparatorClass = null;
+		WritableComparator writableComparator = WritableComparator.get(conf
+				.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+		WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+				writableComparator.getClass());
+		externalSortOp = new ExternalSortOperatorDescriptor(spec, conf.getInt(
+				HYRACKS_EX_SORT_FRAME_LIMIT, DEFAULT_EX_SORT_FRAME_LIMIT),
+				new int[] { 0 },
+				new IBinaryComparatorFactory[] { comparatorFactory },
+				recordDescriptor);
+		return externalSortOp;
+	}
+
+	public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
+			JobConf conf, JobSpecification spec) {
+
+		Class mapOutputKeyClass = conf.getMapOutputKeyClass();
+		Class mapOutputValueClass = conf.getMapOutputValueClass();
+
+		MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
+		ITuplePartitionComputerFactory factory = null;
+		conf.getMapOutputKeyClass();
+		if (conf.getPartitionerClass() != null
+				&& !conf.getPartitionerClass().getName().startsWith(
+						"org.apache.hadoop")) {
+			Class<? extends Partitioner> partitioner = conf
+					.getPartitionerClass();
+			factory = new HadoopPartitionerTuplePartitionComputerFactory(
+					partitioner, DatatypeHelper
+							.createSerializerDeserializer(mapOutputKeyClass),
+					DatatypeHelper
+							.createSerializerDeserializer(mapOutputValueClass));
+		} else {
+			RecordDescriptor recordDescriptor = DatatypeHelper
+					.createKeyValueRecordDescriptor(mapOutputKeyClass,
+							mapOutputValueClass);
+			ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
+					.createSerializerDeserializer(mapOutputKeyClass);
+			factory = new HadoopHashTuplePartitionComputerFactory(
+					mapOutputKeySerializerDerserializer);
+		}
+		connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec,
+				factory);
+		return connectorDescriptor;
+	}
 
 }