Modified the compatibility layer to create JobSpec for a MR job , using the HadoopMapper in a self-read mode. 
Thus the jobSpec for a MR job now has 4 operators : Mapper ---M:N-->-- Sorter(External) ---1:1-->-- Reducer ---1:1:->-- Writer 
This makes the HadoopReadOperator redundant, but is not deleted as it is useful operator in othe scenarios. 

The compatibility layer uses Hadoop Mapper in 'dependent mode' when it is forming a pipeling of MR jobs, where the mapper cannot get its input on its own.

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@177 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs b/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
index fd50bc4..0b1e408 100644
--- a/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-hadoop-compat/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Tue Oct 19 11:05:30 PDT 2010
+#Tue Nov 02 17:09:03 PDT 2010
 eclipse.preferences.version=1
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
index 45c65a1..cc9f7d4 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksClient.java
@@ -1,31 +1,20 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.client;
 
+import java.io.File;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.hadoop.mapred.JobConf;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.hadoop.compat.util.ConfigurationConstants;
 import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
 import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 
 public class HyracksClient {
 
@@ -50,25 +39,41 @@
         hadoopAdapter = new HadoopAdapter(namenodeUrl);
     }
 
-    public HyracksRunningJob submitJobs(List<JobConf> confs, String[] requiredLibs) throws Exception {
+    public HyracksRunningJob submitJobs(List<JobConf> confs, Set<String> requiredLibs) throws Exception {
         JobSpecification spec = hadoopAdapter.getJobSpecification(confs);
-        return submitJob(spec, requiredLibs);
+        String appName  = getApplicationNameHadoopJob(confs.get(0));
+        return submitJob(appName,spec, requiredLibs);
     }
 
-    public HyracksRunningJob submitJob(JobConf conf, String[] requiredLibs) throws Exception {
+    private String getApplicationNameHadoopJob(JobConf jobConf) {
+        String jar = jobConf.getJar();
+        if( jar != null){
+            return jar.substring(jar.lastIndexOf("/") >=0 ? jar.lastIndexOf("/") +1 : 0);
+        }else {
+            return "" + System.currentTimeMillis();
+        }
+    }
+    
+    public HyracksRunningJob submitJob(JobConf conf, Set<String> requiredLibs) throws Exception {
         JobSpecification spec = hadoopAdapter.getJobSpecification(conf);
-        return submitJob(spec, requiredLibs);
+        String appName  = getApplicationNameHadoopJob(conf);
+        return submitJob(appName, spec, requiredLibs);
     }
 
     public JobStatus getJobStatus(UUID jobId) throws Exception {
         return connection.getJobStatus(jobId);
     }
 
-    public HyracksRunningJob submitJob(JobSpecification spec, String[] requiredLibs) throws Exception {
-        String applicationName = "" + spec.toString().hashCode();
-        connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
-        System.out.println(" created application :" + applicationName);
-        UUID jobId = connection.createJob(applicationName, spec);
+    public HyracksRunningJob submitJob(String applicationName, JobSpecification spec, Set<String> requiredLibs) throws Exception {
+        UUID jobId = null;
+        try {
+            jobId = connection.createJob(applicationName, spec);
+        } catch (Exception e){
+            System.out.println(" application not found, creating application");
+            connection.createApplication(applicationName, Utilities.getHyracksArchive(applicationName, requiredLibs));
+            System.out.println(" created application :" + applicationName);
+            jobId = connection.createJob(applicationName, spec);
+        }
         connection.start(jobId);
         HyracksRunningJob runningJob = new HyracksRunningJob(jobId, spec, this);
         return runningJob;
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
index 79a9031..8470e12 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/client/HyracksRunningJob.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.client;
 
 import java.io.IOException;
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
index bbf0a8b..0b96041 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/driver/CompatibilityLayer.java
@@ -1,33 +1,19 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.driver;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.kohsuke.args4j.CmdLineParser;
 
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.hadoop.compat.client.HyracksClient;
 import edu.uci.ics.hyracks.hadoop.compat.client.HyracksRunningJob;
 import edu.uci.ics.hyracks.hadoop.compat.util.CompatibilityConfig;
@@ -35,13 +21,14 @@
 import edu.uci.ics.hyracks.hadoop.compat.util.DCacheHandler;
 import edu.uci.ics.hyracks.hadoop.compat.util.HadoopAdapter;
 import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public class CompatibilityLayer {
 
     HyracksClient hyracksClient;
     DCacheHandler dCacheHander = null;
     Properties clusterConf;
-    String[] systemLibs;
+    Set<String> systemLibs;
 
     private static char configurationFileDelimiter = '=';
     private static final String dacheKeyPrefix = "dcache.key";
@@ -50,8 +37,8 @@
         initialize(clConfig);
     }
 
-    public HyracksRunningJob submitJobs(String[] jobFiles, String[] userLibs) throws Exception {
-        String[] requiredLibs = getRequiredLibs(userLibs);
+    public HyracksRunningJob submitJobs(String[] jobFiles, Set<String> userLibs) throws Exception {
+        Set<String> requiredLibs = getRequiredLibs(userLibs);
         List<JobConf> jobConfs = constructHadoopJobConfs(jobFiles);
         Map<String, String> dcacheTasks = preparePreLaunchDCacheTasks(jobFiles[0]);
         String tempDir = "/tmp";
@@ -71,27 +58,26 @@
         return hyraxRunningJob;
     }
 
-    private String[] getRequiredLibs(String[] userLibs) {
-        List<String> requiredLibs = new ArrayList<String>();
+    private Set<String> getRequiredLibs(Set<String> userLibs) {
+        Set<String> requiredLibs = new HashSet<String>();
         for (String systemLib : systemLibs) {
             requiredLibs.add(systemLib);
         }
         for (String userLib : userLibs) {
             requiredLibs.add(userLib);
         }
-        return requiredLibs.toArray(new String[] {});
+        return requiredLibs;
     }
 
     private void initialize(CompatibilityConfig clConfig) throws Exception {
         clusterConf = Utilities.getProperties(clConfig.clusterConf, configurationFileDelimiter);
-        List<String> systemLibs = new ArrayList<String>();
+        systemLibs = new HashSet<String>();
         for (String systemLib : ConfigurationConstants.systemLibs) {
             String systemLibPath = clusterConf.getProperty(systemLib);
             if (systemLibPath != null) {
                 systemLibs.add(systemLibPath);
             }
         }
-        this.systemLibs = systemLibs.toArray(new String[] {});
         String clusterControllerHost = clusterConf.getProperty(ConfigurationConstants.clusterControllerHost);
         String dacheServerConfiguration = clusterConf.getProperty(ConfigurationConstants.dcacheServerConfiguration);
         String fileSystem = clusterConf.getProperty(ConfigurationConstants.namenodeURL);
@@ -134,7 +120,7 @@
         hyracksClient.waitForCompleton(jobId);
     }
 
-    public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, String[] userLibs) {
+    public HyracksRunningJob submitHadoopJobToHyrax(JobConf jobConf, Set<String> userLibs) {
         HyracksRunningJob hyraxRunningJob = null;
         List<JobConf> jobConfs = new ArrayList<JobConf>();
         jobConfs.add(jobConf);
@@ -147,10 +133,10 @@
         return hyraxRunningJob;
     }
 
-    public HyracksRunningJob submitJob(JobSpecification jobSpec, String[] userLibs) {
+    public HyracksRunningJob submitJob(String appName, JobSpecification jobSpec, Set<String> userLibs) {
         HyracksRunningJob hyraxRunningJob = null;
         try {
-            hyraxRunningJob = hyracksClient.submitJob(jobSpec, getRequiredLibs(userLibs));
+            hyraxRunningJob = hyracksClient.submitJob(appName, jobSpec, getRequiredLibs(userLibs));
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -192,7 +178,11 @@
         }
         CompatibilityLayer compatLayer = new CompatibilityLayer(clConfig);
         String[] jobFiles = compatLayer.getJobs(clConfig);
-        String[] userLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
+        String[] tempUserLibs = clConfig.userLibs == null ? new String[0] : clConfig.userLibs.split(",");
+        Set<String> userLibs = new HashSet<String>();
+        for(String userLib : tempUserLibs) {
+            userLibs.add(userLib);
+        }
         HyracksRunningJob hyraxRunningJob = null;
         try {
             hyraxRunningJob = compatLayer.submitJobs(jobFiles, userLibs);
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
index cf5b2df..1dd266f 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/CompatibilityConfig.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import org.kohsuke.args4j.Option;
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
index 049191c..3b10272 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/ConfigurationConstants.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 public class ConfigurationConstants {
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
index 5b77548..045cd76 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/DCacheHandler.java
@@ -1,20 +1,8 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 import edu.uci.ics.dcache.client.DCacheClient;
 import edu.uci.ics.dcache.client.DCacheClientConfig;
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index f4fcd83..2653dc8 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import java.io.IOException;
@@ -43,7 +29,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReadOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
@@ -51,20 +36,47 @@
 import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 
 public class HadoopAdapter {
 
     public static final String FS_DEFAULT_NAME = "fs.default.name";
     private JobConf jobConf;
-
+    public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT"; 
+    public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
+    public static final int DEFAULT_MAX_MAPPERS = 40;
+    public static final int DEFAULT_MAX_REDUCERS= 40;
+    public static final String MAX_MAPPERS_KEY = "maxMappers";
+    public static final String MAX_REDUCERS_KEY = "maxReducers";
+    public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
+    
+    private  int maxMappers = DEFAULT_MAX_MAPPERS;
+    private  int maxReducers = DEFAULT_MAX_REDUCERS;
+    private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
+    
     public HadoopAdapter(String namenodeUrl) {
         jobConf = new JobConf(true);
         jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
+        if(System.getenv(MAX_MAPPERS_KEY) != null) {
+            maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
+        }
+        if(System.getenv(MAX_REDUCERS_KEY) != null) {
+            maxReducers= Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
+        }
+        if(System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
+            exSortFrame= Integer.parseInt(System.getenv(EX_SORT_FRAME_LIMIT_KEY));
+        }
     }
 
+    private String getEnvironmentVariable(String key, String def) {
+        String ret =  System.getenv(key);
+        return ret != null ? ret : def;
+    }
+    
     public JobConf getConf() {
         return jobConf;
     }
@@ -79,9 +91,8 @@
     private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
         RecordDescriptor recordDescriptor = null;
         try {
-            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                    (Class<? extends Writable>) Class.forName(className1),
-                    (Class<? extends Writable>) Class.forName(className2));
+            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                    .forName(className1), (Class<? extends Writable>) Class.forName(className2));
         } catch (ClassNotFoundException cnfe) {
             cnfe.printStackTrace();
         }
@@ -93,10 +104,22 @@
         return inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
     }
 
-    public HadoopMapperOperatorDescriptor getMapper(JobConf conf, InputSplit[] splits, JobSpecification spec)
-            throws IOException {
-        HadoopMapperOperatorDescriptor mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
-                new ClasspathBasedHadoopClassFactory());
+    public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
+            throws Exception {
+        boolean selfRead = previousOp == null;
+        IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
+        HadoopMapperOperatorDescriptor mapOp = null;
+        PartitionConstraint constraint;
+        if(selfRead) {
+            InputSplit [] splits = getInputSplits(conf,maxMappers);
+            mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
+            mapOp.setPartitionConstraint(new PartitionCountConstraint(splits.length));
+            System.out.println("No of  mappers :" + splits.length);
+        } else {
+            constraint = previousOp.getPartitionConstraint();
+            mapOp.setPartitionConstraint(constraint);
+            mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
+        }
         return mapOp;
     }
 
@@ -130,7 +153,8 @@
             JobSpecification spec) throws Exception {
         PartitionConstraint previousOpConstraint = previousOperator.getPartitionConstraint();
         int noOfInputs = previousOpConstraint instanceof PartitionCountConstraint ? ((PartitionCountConstraint) previousOpConstraint)
-                .getCount() : ((ExplicitPartitionConstraint) previousOpConstraint).getLocationConstraints().length;
+                .getCount()
+                : ((ExplicitPartitionConstraint) previousOpConstraint).getLocationConstraints().length;
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfInputs;
         HadoopWriteOperatorDescriptor writer = null;
         writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
@@ -139,73 +163,95 @@
         return writer;
     }
 
-    private IOperatorDescriptor addMRToExistingPipeline(IOperatorDescriptor previousOperator, JobConf jobConf,
-            JobSpecification spec, InputSplit[] splits) throws IOException {
-        HadoopMapperOperatorDescriptor mapOp = getMapper(jobConf, splits, spec);
-        IOperatorDescriptor mrOutputOperator = mapOp;
-        PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
-        mapOp.setPartitionConstraint(mapperPartitionConstraint);
-        spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapOp, 0);
-        IOperatorDescriptor mapOutputOperator = mapOp;
-
+    private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
+            JobSpecification spec) throws Exception {
         boolean useCombiner = (jobConf.getCombinerClass() != null);
+        IOperatorDescriptor mapSideOutputOp = previousOperator;
         if (useCombiner) {
             System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
-            InMemorySortOperatorDescriptor mapSideCombineSortOp = getSorter(jobConf, spec);
+            PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
+            IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
             mapSideCombineSortOp.setPartitionConstraint(mapperPartitionConstraint);
-
+    
             HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
             mapSideCombineReduceOp.setPartitionConstraint(mapperPartitionConstraint);
-            mapOutputOperator = mapSideCombineReduceOp;
-
-            spec.connect(new OneToOneConnectorDescriptor(spec), mapOp, 0, mapSideCombineSortOp, 0);
+                  spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
             spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
-            mrOutputOperator = mapSideCombineReduceOp;
+            mapSideOutputOp = mapSideCombineSortOp;
         }
-
+        return mapSideOutputOp;
+    }
+    
+    private IOperatorDescriptor addReducer(IOperatorDescriptor previousOperator, JobConf jobConf,
+            JobSpecification spec) throws Exception {
+        IOperatorDescriptor mrOutputOperator = previousOperator;
         if (jobConf.getNumReduceTasks() != 0) {
-            IOperatorDescriptor sorter = getSorter(jobConf, spec);
+            IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
             HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
-
-            PartitionConstraint reducerPartitionConstraint = new PartitionCountConstraint(jobConf.getNumReduceTasks());
+            int numReduceTasks = Math.min(maxReducers,jobConf.getNumReduceTasks());
+            System.out.println("No of Reducers :" + numReduceTasks);
+            PartitionConstraint reducerPartitionConstraint = new PartitionCountConstraint(numReduceTasks);
             sorter.setPartitionConstraint(reducerPartitionConstraint);
             reducer.setPartitionConstraint(reducerPartitionConstraint);
-
+    
             IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
-            spec.connect(mToNConnectorDescriptor, mapOutputOperator, 0, sorter, 0);
+            spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);
             spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
             mrOutputOperator = reducer;
-        }
+        }   
         return mrOutputOperator;
     }
-
+    
+    private InputSplit[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
+        InputSplit[] splits = getInputSplits(conf);
+        System.out.println(" initial split count :" + splits.length);
+        System.out.println(" desired mappers :" + desiredMaxMappers);
+        if (splits.length > desiredMaxMappers) {
+            long totalInputSize = 0;
+            for (InputSplit split : splits) {
+                totalInputSize += split.getLength();
+            }
+            long goalSize = (totalInputSize/desiredMaxMappers);
+            System.out.println(" total input length :" + totalInputSize);
+            System.out.println(" goal size :" + goalSize);
+            conf.setLong("mapred.min.split.size", goalSize);
+            conf.setNumMapTasks(desiredMaxMappers);
+            splits = getInputSplits(conf);
+            System.out.println(" revised split count :" + splits.length);
+        }
+        return splits; 
+    }
+    
     public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
         JobSpecification spec = new JobSpecification();
         Iterator<JobConf> iterator = jobConfs.iterator();
         JobConf firstMR = iterator.next();
-        InputSplit[] splits = getInputSplits(firstMR);
-        IOperatorDescriptor reader = new HadoopReadOperatorDescriptor(firstMR, spec, splits);
-        IOperatorDescriptor outputOperator = reader;
-        outputOperator = addMRToExistingPipeline(reader, firstMR, spec, splits);
+        IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec,firstMR);
         while (iterator.hasNext())
             for (JobConf currentJobConf : jobConfs) {
-                outputOperator = addMRToExistingPipeline(outputOperator, currentJobConf, spec, null);
+                mrOutputOp = configureMapReduce(mrOutputOp, spec , currentJobConf);
             }
-        configureOutput(outputOperator, jobConfs.get(jobConfs.size() - 1), spec);
+        configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
         return spec;
     }
 
     public JobSpecification getJobSpecification(JobConf conf) throws Exception {
         JobSpecification spec = new JobSpecification();
-        InputSplit[] splits = getInputSplits(conf);
-        HadoopReadOperatorDescriptor hadoopReadOperatorDescriptor = new HadoopReadOperatorDescriptor(conf, spec, splits);
-        IOperatorDescriptor mrOutputOperator = addMRToExistingPipeline(hadoopReadOperatorDescriptor, conf, spec, splits);
-        IOperatorDescriptor printer = configureOutput(mrOutputOperator, conf, spec);
+        IOperatorDescriptor mrOutput = configureMapReduce(null,spec, conf);
+        IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
         spec.addRoot(printer);
+        System.out.println(spec);
         return spec;
     }
+    
+    private IOperatorDescriptor configureMapReduce(IOperatorDescriptor previousOuputOp, JobSpecification spec, JobConf conf) throws Exception {
+        IOperatorDescriptor mapper = getMapper(conf,spec,previousOuputOp);
+        IOperatorDescriptor mapSideOutputOp = addCombiner(mapper,conf,spec);
+        IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
+        return reducer; 
+    }
 
-    public static InMemorySortOperatorDescriptor getSorter(JobConf conf, JobSpecification spec) {
+    public static InMemorySortOperatorDescriptor getInMemorySorter(JobConf conf, JobSpecification spec) {
         InMemorySortOperatorDescriptor inMemorySortOp = null;
         RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
                 .getMapOutputValueClass().getName());
@@ -219,6 +265,20 @@
         return inMemorySortOp;
     }
 
+    public static ExternalSortOperatorDescriptor getExternalSorter(JobConf conf, JobSpecification spec) {
+        ExternalSortOperatorDescriptor externalSortOp = null;
+        RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
+                .getMapOutputValueClass().getName());
+        Class<? extends RawComparator> rawComparatorClass = null;
+        WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
+                WritableComparable.class));
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
+                writableComparator.getClass());
+        externalSortOp = new ExternalSortOperatorDescriptor(spec,conf.getInt(HYRACKS_EX_SORT_FRAME_LIMIT,DEFAULT_EX_SORT_FRAME_LIMIT),new int[] { 0 },
+                new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
+        return externalSortOp;
+    }
+    
     public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
             JobSpecification spec) {
 
@@ -230,9 +290,9 @@
         conf.getMapOutputKeyClass();
         if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
             Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
-            factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner,
-                    DatatypeHelper.createSerializerDeserializer(mapOutputKeyClass),
-                    DatatypeHelper.createSerializerDeserializer(mapOutputValueClass));
+            factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner, DatatypeHelper
+                    .createSerializerDeserializer(mapOutputKeyClass), DatatypeHelper
+                    .createSerializerDeserializer(mapOutputValueClass));
         } else {
             RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
                     mapOutputValueClass);
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
index d6bd652..cc9c012 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/Utilities.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.hadoop.compat.util;
 
 import java.io.BufferedReader;
@@ -21,10 +7,16 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+
 public class Utilities {
 
     public static Properties getProperties(String filePath, char delimiter) {
@@ -47,15 +39,19 @@
         return properties;
     }
 
-    public static File getHyracksArchive(String applicationName, String[] libJars) {
+    public static File getHyracksArchive(String applicationName, Set<String> libJars) {
         String target = applicationName + ".zip";
         // Create a buffer for reading the files
         byte[] buf = new byte[1024];
+        Set<String> fileNames = new HashSet<String>();
         try {
             ZipOutputStream out = new ZipOutputStream(new FileOutputStream(target));
-            for (int i = 0; i < libJars.length; i++) {
-                String fileName = libJars[i].substring(libJars[i].lastIndexOf("/") + 1);
-                FileInputStream in = new FileInputStream(libJars[i]);
+            for (String libJar : libJars) {
+                String fileName = libJar.substring(libJar.lastIndexOf("/") + 1);
+                if(fileNames.contains(fileName)){
+                    continue;
+                }
+                FileInputStream in = new FileInputStream(libJar);
                 out.putNextEntry(new ZipEntry(fileName));
                 int len;
                 while ((len = in.read(buf)) > 0) {
@@ -63,13 +59,55 @@
                 }
                 out.closeEntry();
                 in.close();
+                fileNames.add(fileName);
             }
             out.close();
         } catch (IOException e) {
+            e.printStackTrace();
         }
         File har = new File(target);
         har.deleteOnExit();
         return har;
     }
+    
+    public static Reporter createReporter() {
+        Reporter reporter = new Reporter() {
 
+            @Override
+            public void progress() {
+
+            }
+
+            @Override
+            public void setStatus(String arg0) {
+
+            }
+
+            @Override
+            public void incrCounter(String arg0, String arg1, long arg2) {
+
+            }
+
+            @Override
+            public void incrCounter(Enum<?> arg0, long arg1) {
+
+            }
+
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(String arg0, String arg1) {
+                return null;
+            }
+
+            @Override
+            public Counter getCounter(Enum<?> arg0) {
+                return null;
+            }
+        };
+        return reporter;
+    }
 }