Made changes in HadoopAdapter ( the part responsible for converting a JobConf into a JobSpec ) in accordance with the new mechanism of configuring partition constraints for operators

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@307 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index 701e24a..d0df7f1 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -4,6 +4,8 @@
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -23,6 +25,7 @@
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,6 +49,7 @@
 
     public static final String FS_DEFAULT_NAME = "fs.default.name";
     private JobConf jobConf;
+    private Map<OperatorDescriptorId,Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId,Integer>();
     public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT"; 
     public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
     public static final int DEFAULT_MAX_MAPPERS = 40;
@@ -128,7 +132,12 @@
       	InputFormat inputFormat = conf.getInputFormat();
     	return inputFormat.getSplits(conf, conf.getNumMapTasks());
     }
-    
+   
+    private void configurePartitionCountConstraint(JobSpecification spec, IOperatorDescriptor operator,int instanceCount){
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, instanceCount);
+        operatorInstanceCount.put(operator.getOperatorId(),instanceCount);
+    }
+
     public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
             throws Exception {
         boolean selfRead = previousOp == null;
@@ -137,12 +146,12 @@
         if(selfRead) {
             Object [] splits = getInputSplits(conf,maxMappers);
             mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
-            PartitionConstraintHelper.addPartitionCountConstraint(spec, mapOp, splits.length);
+	    configurePartitionCountConstraint(spec,mapOp,splits.length);
             System.out.println("No of  mappers :" + splits.length);
         } else {
-            Object [] splits = getInputSplits(conf,maxMappers);
-            PartitionConstraintHelper.addPartitionCountConstraint(spec, mapOp, splits.length);
+	    configurePartitionCountConstraint(spec,mapOp,getInstanceCount(previousOp));
             mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
+            spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0, mapOp, 0);
         }
         return mapOp;
     }
@@ -173,29 +182,34 @@
         return spec;
     }
 
-    private IOperatorDescriptor configureOutput(int partitionCount, IOperatorDescriptor previousOperator, JobConf conf,
+    private IOperatorDescriptor configureOutput( IOperatorDescriptor previousOperator, JobConf conf,
             JobSpecification spec) throws Exception {
-        int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : partitionCount;
+	int instanceCountPreviousOperator = operatorInstanceCount.get(previousOperator.getOperatorId());
+        int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : instanceCountPreviousOperator;
         HadoopWriteOperatorDescriptor writer = null;
         writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, numOutputters);
+	configurePartitionCountConstraint(spec,writer,numOutputters);
         spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
         return writer;
     }
 
+
+    private int getInstanceCount(IOperatorDescriptor operator) {
+        return operatorInstanceCount.get(operator.getOperatorId());
+    } 
+
     private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
             JobSpecification spec) throws Exception {
         boolean useCombiner = (jobConf.getCombinerClass() != null);
         IOperatorDescriptor mapSideOutputOp = previousOperator;
         if (useCombiner) {
             System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
-            PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
             IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
-            mapSideCombineSortOp.setPartitionConstraint(mapperPartitionConstraint);
+	    configurePartitionCountConstraint(spec,mapSideCombineSortOp,getInstanceCount(previousOperator));
     
             HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
-            mapSideCombineReduceOp.setPartitionConstraint(mapperPartitionConstraint);
-                  spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
+	    configurePartitionCountConstraint(spec,mapSideCombineReduceOp,getInstanceCount(previousOperator));
+            spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
             spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
             mapSideOutputOp = mapSideCombineSortOp;
         }
@@ -215,8 +229,8 @@
             HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
             int numReduceTasks = getNumReduceTasks(jobConf);
             System.out.println("No of Reducers :" + numReduceTasks);
-            PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, numReduceTasks);
-            PartitionConstraintHelper.addPartitionCountConstraint(spec, reducer, numReduceTasks);
+	    configurePartitionCountConstraint(spec,sorter,numReduceTasks);
+	    configurePartitionCountConstraint(spec,reducer,numReduceTasks);
     
             IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
             spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);