Fixed Split instantiation
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@140 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 89ee92f..4d23b46 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -117,7 +117,7 @@
RecordReader hadoopRecordReader;
Writable key;
Writable value;
- InputSplit[] splits = inputSplitsProxy.toInputSplits();
+ InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
InputSplit inputSplit = splits[partition];
Class inputFormatClass = Class.forName(inputFormatClassName);
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index ba23c4b..c685724 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -22,6 +22,8 @@
import java.io.Serializable;
import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
public class InputSplitsProxy implements Serializable {
private static final long serialVersionUID = 1L;
@@ -41,11 +43,12 @@
bytes = baos.toByteArray();
}
- public InputSplit[] toInputSplits() throws InstantiationException, IllegalAccessException, IOException {
+ public InputSplit[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
+ IOException {
InputSplit[] splits = new InputSplit[isClasses.length];
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
for (int i = 0; i < splits.length; ++i) {
- splits[i] = isClasses[i].newInstance();
+ splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
splits[i].readFields(dis);
}
return splits;