diff --git a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
index fd50bc4..0b1e408 100644
--- a/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Tue Oct 19 11:05:30 PDT 2010
+#Tue Nov 02 17:09:03 PDT 2010
 eclipse.preferences.version=1
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index 45c65a1..cc9f7d4 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -1,31 +1,20 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.client;
 
+import java.io.File;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.hadoop.mapred.JobConf;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
 import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
 import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 
 public class HyracksClient {
 
@@ -50,25 +39,41 @@
         hadoopAdapter = new HadoopAdapter(namenodeUrl);
     }
 
-    public HyracksRunningJob submitJobs(List<JobConf> confs, String[] requiredLibs) throws Exception {
+    public HyracksRunningJob submitJobs(List<JobConf> confs, Set<String> requiredLibs) throws Exception {
         JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
-        return submitJob(spec, requiredLibs);
+        String appName  = getApplicationNameHadoopJob(confs.get(0));
+        return submitJob(appName,spec, requiredLibs);
     }
 
-    public HyracksRunningJob submitJob(JobConf conf, String[] requiredLibs) throws Exception {
+    private String getApplicationNameHadoopJob(JobConf jobConf) {
+        String jar = jobConf.getJar();
+        if( jar != null){
+            return jar.substring(jar.lastIndexOf("/") >=0 ? jar.lastIndexOf("/") +1 : 0);
+        }else {
+            return "" + System.currentTimeMillis();
+        }
+    }
+    
+    public HyracksRunningJob submitJob(JobConf conf, Set<String> requiredLibs) throws Exception {
         JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
-        return submitJob(spec, requiredLibs);
+        String appName  = getApplicationNameHadoopJob(conf);
+        return submitJob(appName, spec, requiredLibs);
     }
 
     public JobStatus getJobStatus(UUID jobId) throws Exception {
         return connection.getJobStatus(jobId);
     }
 
-    public HyracksRunningJob submitJob(JobSpecification spec, String[] requiredLibs) throws Exception {
-        String applicationName = "" + spec.toString().hashCode();
-        connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
-        System.out.println(" created application :" + applicationName);
-        UUID jobId = connection.createJob(applicationName, spec);
+    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> requiredLibs) throws Exception {
+        UUID jobId = null;
+        try {
+            jobId = connection.createJob(applicationName, spec);
+        } catch (Exception e){
+            System.out.println(" application not found, creating application");
+            connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
+            System.out.println(" created application :" + applicationName);
+            jobId = connection.createJob(applicationName, spec);
+        }
         connection.start(jobId);
         HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
         return runningJob;
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
index 79a9031..8470e12 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.client;
 
 import java.io.IOException;
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index bbf0a8b..0b96041 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -1,33 +1,19 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.driver;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.kohsuke.args4j.CmdLineParser;
 
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
 import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
 import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
@@ -35,13 +21,14 @@
 import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
 import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
 import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public class CompatibilityLayer {
 
     HyracksClient hyracksClient;
     DCacheHandler dCacheHander = null;
     Properties clusterConf;
-    String[] systemLibs;
+    Set<String> systemLibs;
 
     private static char configurationFileDelimiter = '=';
     private static final String dacheKeyPrefix = "dcache.key";
@@ -50,8 +37,8 @@
         initialize(clConfig);
     }
 
-    public HyracksRunningJob submitJobs(String[] jobFiles, String[] userLibs) throws Exception {
-        String[] requiredLibs = getRequiredLibs(userLibs);
+    public HyracksRunningJob submitJobs(String[] jobFiles, Set<String> userLibs) throws Exception {
+        Set<String> requiredLibs = getRequiredLibs(userLibs);
         List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
         Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFiles[0]);
         String tempDir = "/tmp";
@@ -71,27 +58,26 @@
         return hyraxRunningJob;
     }
 
-    private String[] getRequiredLibs(String[] userLibs) {
-        List<String> requiredLibs = new ArrayList<String>();
+    private Set<String> getRequiredLibs(Set<String> userLibs) {
+        Set<String> requiredLibs = new HashSet<String>();
         for (String systemLib : systemLibs) {
             requiredLibs.add(systemLib);
         }
         for (String userLib : userLibs) {
             requiredLibs.add(userLib);
         }
-        return requiredLibs.toArray(new String[] {});
+        return requiredLibs;
     }
 
     private void initialize(CompatibilityConfig clConfig) throws Exception {
         clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
-        List<String> systemLibs = new ArrayList<String>();
+        systemLibs = new HashSet<String>();
         for (String systemLib : ConfigurationConstants.systemLibs) {
             String systemLibPath = clusterConf.getProperty(systemLib);
             if (systemLibPath != null) {
                 systemLibs.add(systemLibPath);
             }
         }
-        this.systemLibs = systemLibs.toArray(new String[] {});
         String clusterControllerHost = clusterConf.getProperty(ConfigurationConstants.clusterControllerHost);
         String dacheServerConfiguration = clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration);
         String fileSystem = clusterConf.getProperty(ConfigurationConstants.namenodeURL);
@@ -134,7 +120,7 @@
         hyracksClient.waitForCompleton(jobId);
     }
 
-    public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, String[] userLibs) {
+    public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, Set<String> userLibs) {
         HyracksRunningJob hyraxRunningJob = null;
         List<JobConf> jobConfs = new ArrayList<JobConf>();
         jobConfs.add(jobConf);
@@ -147,10 +133,10 @@
         return hyraxRunningJob;
     }
 
-    public HyracksRunningJob submitJob(JobSpecification jobSpec, String[] userLibs) {
+    public HyracksRunningJob submitJob(String appName, JobSpecification jobSpec, Set<String> userLibs) {
         HyracksRunningJob hyraxRunningJob = null;
         try {
-            hyraxRunningJob = hyracksClient.submitJob(jobSpec, getRequiredLibs(userLibs));
+            hyraxRunningJob = hyracksClient.submitJob(appName, jobSpec, getRequiredLibs(userLibs));
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -192,7 +178,11 @@
         }
         CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
         String[] jobFiles = compatLayer.getJobs(clConfig);
-        String[] userLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
+        String[] tempUserLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
+        Set<String> userLibs = new HashSet<String>();
+        for(String userLib : tempUserLibs) {
+            userLibs.add(userLib);
+        }
         HyracksRunningJob hyraxRunningJob = null;
         try {
             hyraxRunningJob = compatLayer.submitJobs(jobFiles, userLibs);
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
index cf5b2df..1dd266f 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import org.kohsuke.args4j.Option;
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
index 049191c..3b10272 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 public class ConfigurationConstants {
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
index 5b77548..045cd76 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
@@ -1,20 +1,8 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 import edu.uci.ics.dcache.client.DCacheClient;
 import edu.uci.ics.dcache.client.DCacheClientConfig;
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index f4fcd83..2653dc8 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import java.io.IOException;
@@ -43,7 +29,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReadOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
@@ -51,20 +36,47 @@
 import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 
 public class HadoopAdapter {
 
     public static final String FS_DEFAULT_NAME = "fs.default.name";
     private JobConf jobConf;
-
+    public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT"; 
+    public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
+    public static final int DEFAULT_MAX_MAPPERS = 40;
+    public static final int DEFAULT_MAX_REDUCERS= 40;
+    public static final String MAX_MAPPERS_KEY = "maxMappers";
+    public static final String MAX_REDUCERS_KEY = "maxReducers";
+    public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
+    
+    private  int maxMappers = DEFAULT_MAX_MAPPERS;
+    private  int maxReducers = DEFAULT_MAX_REDUCERS;
+    private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
+    
     public HadoopAdapter(String namenodeUrl) {
         jobConf = new JobConf(true);
         jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+        if(System.getenv(MAX_MAPPERS_KEY) != null) {
+            maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
+        }
+        if(System.getenv(MAX_REDUCERS_KEY) != null) {
+            maxReducers= Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
+        }
+        if(System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
+            exSortFrame= Integer.parseInt(System.getenv(EX_SORT_FRAME_LIMIT_KEY));
+        }
     }
 
+    private String getEnvironmentVariable(String key, String def) {
+        String ret =  System.getenv(key);
+        return ret != null ? ret : def;
+    }
+    
     public JobConf getConf() {
         return jobConf;
     }
@@ -79,9 +91,8 @@
     private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
         RecordDescriptor recordDescriptor = null;
         try {
-            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                    (Class<? extends Writable>) Class.forName(className1),
-                    (Class<? extends Writable>) Class.forName(className2));
+            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                    .forName(className1), (Class<? extends Writable>) Class.forName(className2));
         } catch (ClassNotFoundException cnfe) {
             cnfe.printStackTrace();
         }
@@ -93,10 +104,22 @@
         return inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
     }
 
-    public HadoopMapperOperatorDescriptor getMapper(JobConf conf, InputSplit[] splits, JobSpecification spec)
-            throws IOException {
-        HadoopMapperOperatorDescriptor mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
-                new ClasspathBasedHadoopClassFactory());
+    public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
+            throws Exception {
+        boolean selfRead = previousOp == null;
+        IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
+        HadoopMapperOperatorDescriptor mapOp = null;
+        PartitionConstraint constraint;
+        if(selfRead) {
+            InputSplit [] splits = getInputSplits(conf,maxMappers);
+            mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
+            mapOp.setPartitionConstraint(new PartitionCountConstraint(splits.length));
+            System.out.println("No of  mappers :" + splits.length);
+        } else {
+            constraint = previousOp.getPartitionConstraint();
+            mapOp.setPartitionConstraint(constraint);
+            mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
+        }
         return mapOp;
     }
 
@@ -130,7 +153,8 @@
             JobSpecification spec) throws Exception {
         PartitionConstraint previousOpConstraint = previousOperator.getPartitionConstraint();
         int noOfInputs = previousOpConstraint instanceof PartitionCountConstraint ? ((PartitionCountConstraint) previousOpConstraint)
-                .getCount() : ((ExplicitPartitionConstraint) previousOpConstraint).getLocationConstraints().length;
+                .getCount()
+                : ((ExplicitPartitionConstraint) previousOpConstraint).getLocationConstraints().length;
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfInputs;
         HadoopWriteOperatorDescriptor writer = null;
         writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
@@ -139,73 +163,95 @@
         return writer;
     }
 
-    private IOperatorDescriptor addMRToExistingPipeline(IOperatorDescriptor previousOperator, JobConf jobConf,
-            JobSpecification spec, InputSplit[] splits) throws IOException {
-        HadoopMapperOperatorDescriptor mapOp = getMapper(jobConf, splits, spec);
-        IOperatorDescriptor mrOutputOperator = mapOp;
-        PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
-        mapOp.setPartitionConstraint(mapperPartitionConstraint);
-        spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapOp, 0);
-        IOperatorDescriptor mapOutputOperator = mapOp;
-
+    private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
+            JobSpecification spec) throws Exception {
         boolean useCombiner = (jobConf.getCombinerClass() != null);
+        IOperatorDescriptor mapSideOutputOp = previousOperator;
         if (useCombiner) {
             System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
-            InMemorySortOperatorDescriptor mapSideCombineSortOp = getSorter(jobConf, spec);
+            PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
+            IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
             mapSideCombineSortOp.setPartitionConstraint(mapperPartitionConstraint);
-
+    
             HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
             mapSideCombineReduceOp.setPartitionConstraint(mapperPartitionConstraint);
-            mapOutputOperator = mapSideCombineReduceOp;
-
-            spec.connect(new OneToOneConnectorDescriptor(spec), mapOp, 0, mapSideCombineSortOp, 0);
+                  spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
             spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
-            mrOutputOperator = mapSideCombineReduceOp;
+            mapSideOutputOp = mapSideCombineSortOp;
         }
-
+        return mapSideOutputOp;
+    }
+    
+    private IOperatorDescriptor addReducer(IOperatorDescriptor previousOperator, JobConf jobConf,
+            JobSpecification spec) throws Exception {
+        IOperatorDescriptor mrOutputOperator = previousOperator;
         if (jobConf.getNumReduceTasks() != 0) {
-            IOperatorDescriptor sorter = getSorter(jobConf, spec);
+            IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
             HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
-
-            PartitionConstraint reducerPartitionConstraint = new PartitionCountConstraint(jobConf.getNumReduceTasks());
+            int numReduceTasks = Math.min(maxReducers,jobConf.getNumReduceTasks());
+            System.out.println("No of Reducers :" + numReduceTasks);
+            PartitionConstraint reducerPartitionConstraint = new PartitionCountConstraint(numReduceTasks);
             sorter.setPartitionConstraint(reducerPartitionConstraint);
             reducer.setPartitionConstraint(reducerPartitionConstraint);
-
+    
             IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
-            spec.connect(mToNConnectorDescriptor, mapOutputOperator, 0, sorter, 0);
+            spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);
             spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
             mrOutputOperator = reducer;
-        }
+        }   
         return mrOutputOperator;
     }
-
+    
+    private InputSplit[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
+        InputSplit[] splits = getInputSplits(conf);
+        System.out.println(" initial split count :" + splits.length);
+        System.out.println(" desired mappers :" + desiredMaxMappers);
+        if (splits.length > desiredMaxMappers) {
+            long totalInputSize = 0;
+            for (InputSplit split : splits) {
+                totalInputSize += split.getLength();
+            }
+            long goalSize = (totalInputSize/desiredMaxMappers);
+            System.out.println(" total input length :" + totalInputSize);
+            System.out.println(" goal size :" + goalSize);
+            conf.setLong("mapred.min.split.size", goalSize);
+            conf.setNumMapTasks(desiredMaxMappers);
+            splits = getInputSplits(conf);
+            System.out.println(" revised split count :" + splits.length);
+        }
+        return splits; 
+    }
+    
     public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
         JobSpecification spec = new JobSpecification();
         Iterator<JobConf> iterator = jobConfs.iterator();
         JobConf firstMR = iterator.next();
-        InputSplit[] splits = getInputSplits(firstMR);
-        IOperatorDescriptor reader = new HadoopReadOperatorDescriptor(firstMR, spec, splits);
-        IOperatorDescriptor outputOperator = reader;
-        outputOperator = addMRToExistingPipeline(reader, firstMR, spec, splits);
+        IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec,firstMR);
         while (iterator.hasNext())
             for (JobConf currentJobConf : jobConfs) {
-                outputOperator = addMRToExistingPipeline(outputOperator, currentJobConf, spec, null);
+                mrOutputOp = configureMapReduce(mrOutputOp, spec , currentJobConf);
             }
-        configureOutput(outputOperator, jobConfs.get(jobConfs.size() - 1), spec);
+        configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
         return spec;
     }
 
     public JobSpecification getJobSpecification(JobConf conf) throws Exception {
         JobSpecification spec = new JobSpecification();
-        InputSplit[] splits = getInputSplits(conf);
-        HadoopReadOperatorDescriptor hadoopReadOperatorDescriptor = new HadoopReadOperatorDescriptor(conf, spec, splits);
-        IOperatorDescriptor mrOutputOperator = addMRToExistingPipeline(hadoopReadOperatorDescriptor, conf, spec, splits);
-        IOperatorDescriptor printer = configureOutput(mrOutputOperator, conf, spec);
+        IOperatorDescriptor mrOutput = configureMapReduce(null,spec, conf);
+        IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
         spec.addRoot(printer);
+        System.out.println(spec);
         return spec;
     }
+    
+    private IOperatorDescriptor configureMapReduce(IOperatorDescriptor previousOuputOp, JobSpecification spec, JobConf conf) throws Exception {
+        IOperatorDescriptor mapper = getMapper(conf,spec,previousOuputOp);
+        IOperatorDescriptor mapSideOutputOp = addCombiner(mapper,conf,spec);
+        IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
+        return reducer; 
+    }
 
-    public static InMemorySortOperatorDescriptor getSorter(JobConf conf, JobSpecification spec) {
+    public static InMemorySortOperatorDescriptor getInMemorySorter(JobConf conf, JobSpecification spec) {
         InMemorySortOperatorDescriptor inMemorySortOp = null;
         RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
                 .getMapOutputValueClass().getName());
@@ -219,6 +265,20 @@
         return inMemorySortOp;
     }
 
+    public static ExternalSortOperatorDescriptor getExternalSorter(JobConf conf, JobSpecification spec) {
+        ExternalSortOperatorDescriptor externalSortOp = null;
+        RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
+                .getMapOutputValueClass().getName());
+        Class<? extends RawComparator> rawComparatorClass = null;
+        WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
+                WritableComparable.class));
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+                writableComparator.getClass());
+        externalSortOp = new ExternalSortOperatorDescriptor(spec,conf.getInt(HYRACKS_EX_SORT_FRAME_LIMIT,DEFAULT_EX_SORT_FRAME_LIMIT),new int[] { 0 },
+                new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
+        return externalSortOp;
+    }
+    
     public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
             JobSpecification spec) {
 
@@ -230,9 +290,9 @@
         conf.getMapOutputKeyClass();
         if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
             Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
-            factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner,
-                    DatatypeHelper.createSerializerDeserializer(mapOutputKeyClass),
-                    DatatypeHelper.createSerializerDeserializer(mapOutputValueClass));
+            factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner, DatatypeHelper
+                    .createSerializerDeserializer(mapOutputKeyClass), DatatypeHelper
+                    .createSerializerDeserializer(mapOutputValueClass));
         } else {
             RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
                     mapOutputValueClass);
diff --git a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
index d6bd652..cc9c012 100644
--- a/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
+++ b/hyracks/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import java.io.BufferedReader;
@@ -21,10 +7,16 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+
 public class Utilities {
 
     public static Properties getProperties(String filePath, char delimiter) {
@@ -47,15 +39,19 @@
         return properties;
     }
 
-    public static File getHyracksArchive(String applicationName, String[] libJars) {
+    public static File getHyracksArchive(String applicationName, Set<String> libJars) {
         String target = applicationName + ".zip";
         // Create a buffer for reading the files
         byte[] buf = new byte[1024];
+        Set<String> fileNames = new HashSet<String>();
         try {
             ZipOutputStream out = new ZipOutputStream(new FileOutputStream(target));
-            for (int i = 0; i < libJars.length; i++) {
-                String fileName = libJars[i].substring(libJars[i].lastIndexOf("/") + 1);
-                FileInputStream in = new FileInputStream(libJars[i]);
+            for (String libJar : libJars) {
+                String fileName = libJar.substring(libJar.lastIndexOf("/") + 1);
+                if(fileNames.contains(fileName)){
+                    continue;
+                }
+                FileInputStream in = new FileInputStream(libJar);
                 out.putNextEntry(new ZipEntry(fileName));
                 int len;
                 while ((len = in.read(buf)) > 0) {
@@ -63,13 +59,55 @@
                 }
                 out.closeEntry();
                 in.close();
+                fileNames.add(fileName);
             }
             out.close();
         } catch (IOException e) {
+            e.printStackTrace();
         }
         File har = new File(target);
         har.deleteOnExit();
         return har;
     }
+    
+    public static Reporter createReporter() {
+        Reporter reporter = new Reporter() {
 
+            @Override
+            public void progress() {
+
+            }
+
+            @Override
+            public void setStatus(String arg0) {
+
+            }
+
+            @Override
+            public void incrCounter(String arg0, String arg1, long arg2) {
+
+            }
+
+            @Override
+            public void incrCounter(Enum<?> arg0, long arg1) {
+
+            }
+
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(String arg0, String arg1) {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(Enum<?> arg0) {
+                return null;
+            }
+        };
+        return reporter;
+    }
 }
