Fixed Split instantiation

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@140 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 89ee92f..4d23b46 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -117,7 +117,7 @@
                     RecordReader hadoopRecordReader;
                     Writable key;
                     Writable value;
-                    InputSplit[] splits = inputSplitsProxy.toInputSplits();
+                    InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
                     InputSplit inputSplit = splits[partition];
                     Class inputFormatClass = Class.forName(inputFormatClassName);
                     InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index ba23c4b..c685724 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -22,6 +22,8 @@
 import java.io.Serializable;
 
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
 
 public class InputSplitsProxy implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -41,11 +43,12 @@
         bytes = baos.toByteArray();
     }
 
-    public InputSplit[] toInputSplits() throws InstantiationException, IllegalAccessException, IOException {
+    public InputSplit[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
+            IOException {
         InputSplit[] splits = new InputSplit[isClasses.length];
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
         for (int i = 0; i < splits.length; ++i) {
-            splits[i] = isClasses[i].newInstance();
+            splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
             splits[i].readFields(dis);
         }
         return splits;