Made changes in HadoopAdapter ( the part responsible for converting a JobConf into a JobSpec ) in accordance with the new mechanism of configuring partition constraints for operators
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@307 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
index 701e24a..d0df7f1 100644
--- a/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
+++ b/hyracks-hadoop-compat/src/main/java/edu/uci/ics/hyracks/hadoop/compat/util/HadoopAdapter.java
@@ -4,6 +4,8 @@
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -23,6 +25,7 @@
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,6 +49,7 @@
public static final String FS_DEFAULT_NAME = "fs.default.name";
private JobConf jobConf;
+ private Map<OperatorDescriptorId,Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId,Integer>();
public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
public static final int DEFAULT_MAX_MAPPERS = 40;
@@ -128,7 +132,12 @@
InputFormat inputFormat = conf.getInputFormat();
return inputFormat.getSplits(conf, conf.getNumMapTasks());
}
-
+
+ private void configurePartitionCountConstraint(JobSpecification spec, IOperatorDescriptor operator,int instanceCount){
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, instanceCount);
+ operatorInstanceCount.put(operator.getOperatorId(),instanceCount);
+ }
+
public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
throws Exception {
boolean selfRead = previousOp == null;
@@ -137,12 +146,12 @@
if(selfRead) {
Object [] splits = getInputSplits(conf,maxMappers);
mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, mapOp, splits.length);
+ configurePartitionCountConstraint(spec,mapOp,splits.length);
System.out.println("No of mappers :" + splits.length);
} else {
- Object [] splits = getInputSplits(conf,maxMappers);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, mapOp, splits.length);
+ configurePartitionCountConstraint(spec,mapOp,getInstanceCount(previousOp));
mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
+ spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0, mapOp, 0);
}
return mapOp;
}
@@ -173,29 +182,34 @@
return spec;
}
- private IOperatorDescriptor configureOutput(int partitionCount, IOperatorDescriptor previousOperator, JobConf conf,
+ private IOperatorDescriptor configureOutput( IOperatorDescriptor previousOperator, JobConf conf,
JobSpecification spec) throws Exception {
- int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : partitionCount;
+ int instanceCountPreviousOperator = operatorInstanceCount.get(previousOperator.getOperatorId());
+ int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : instanceCountPreviousOperator;
HadoopWriteOperatorDescriptor writer = null;
writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, numOutputters);
+ configurePartitionCountConstraint(spec,writer,numOutputters);
spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
return writer;
}
+
+ private int getInstanceCount(IOperatorDescriptor operator) {
+ return operatorInstanceCount.get(operator.getOperatorId());
+ }
+
private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
JobSpecification spec) throws Exception {
boolean useCombiner = (jobConf.getCombinerClass() != null);
IOperatorDescriptor mapSideOutputOp = previousOperator;
if (useCombiner) {
System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
- PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
- mapSideCombineSortOp.setPartitionConstraint(mapperPartitionConstraint);
+ configurePartitionCountConstraint(spec,mapSideCombineSortOp,getInstanceCount(previousOperator));
HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
- mapSideCombineReduceOp.setPartitionConstraint(mapperPartitionConstraint);
- spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
+ configurePartitionCountConstraint(spec,mapSideCombineReduceOp,getInstanceCount(previousOperator));
+ spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
mapSideOutputOp = mapSideCombineSortOp;
}
@@ -215,8 +229,8 @@
HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
int numReduceTasks = getNumReduceTasks(jobConf);
System.out.println("No of Reducers :" + numReduceTasks);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, numReduceTasks);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, reducer, numReduceTasks);
+ configurePartitionCountConstraint(spec,sorter,numReduceTasks);
+ configurePartitionCountConstraint(spec,reducer,numReduceTasks);
IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);