Optimized Hadoop File Reader

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@136 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
index 82d0e9e..c12e663 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 
 public interface IActivityNode extends Serializable {
@@ -26,5 +27,5 @@
     public IOperatorDescriptor getOwner();
 
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions);
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
deleted file mode 100644
index a9538d0..0000000
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.hadoop;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
-
-public abstract class AbstractHadoopFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-    
-    public AbstractHadoopFileScanOperatorDescriptor(JobSpecification spec,
-            RecordDescriptor recordDescriptor) {
-        super(spec, 0, 1);
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    protected abstract IRecordReader createRecordReader(InputSplit fileSplit, RecordDescriptor desc)
-            throws Exception;
-
-    
-    protected Reporter createReporter() {
-        return new Reporter() {
-            @Override
-            public Counter getCounter(Enum<?> name) {
-                return null;
-            }
-
-            @Override
-            public Counter getCounter(String group, String name) {
-                return null;
-            }
-
-            @Override
-            public InputSplit getInputSplit() throws UnsupportedOperationException {
-                return null;
-            }
-
-            @Override
-            public void incrCounter(Enum<?> key, long amount) {
-
-            }
-
-            @Override
-            public void incrCounter(String group, String counter, long amount) {
-
-            }
-
-            @Override
-            public void progress() {
-
-            }
-
-            @Override
-            public void setStatus(String status) {
-
-            }
-        };
-    }
-
- 
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index e7fd9c1..89ee92f 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -15,271 +15,158 @@
 package edu.uci.ics.hyracks.dataflow.hadoop;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitHandler;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
-import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
-public class HadoopReadOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
+public class HadoopReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
 
-	protected transient InputSplit[] splits;
-	protected transient Path splitFilePath;
-	protected transient JobConf jobConf;
-	protected transient Reporter reporter;
+    private String inputFormatClassName;
+    private Map<String, String> jobConfMap;
+    private InputSplitsProxy inputSplitsProxy;
 
-	private static final long serialVersionUID = 1L;
-	private String inputFormatClassName;
-	private Map<String, String> jobConfMap;
-	private static String splitDirectory = "/tmp/splits";
+    public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec) throws IOException {
+        super(spec, 0, 1);
+        this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
+        InputFormat inputFormat = jobConf.getInputFormat();
+        InputSplit[] splits = inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
+        RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, createReporter());
+        recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
+                .createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
+        this.setPartitionConstraint(new PartitionCountConstraint(splits.length));
+        inputSplitsProxy = new InputSplitsProxy(splits);
+        this.inputFormatClassName = inputFormat.getClass().getName();
+    }
 
-	private void initialize() {
-		try {
-			reporter = createReporter();
-			if (jobConf == null) {
-				jobConf = DatatypeHelper.map2JobConf(jobConfMap);
-			}
-			splits = InputSplitHandler.getInputSplits(jobConf, splitFilePath);
-		} catch (IOException ioe) {
-			ioe.printStackTrace();
-		}
-	}
+    protected Reporter createReporter() {
+        return new Reporter() {
+            @Override
+            public Counter getCounter(Enum<?> name) {
+                return null;
+            }
 
-	protected class FileScanOperator implements IOpenableDataWriterOperator {
-		private IOpenableDataWriter<Object[]> writer;
+            @Override
+            public Counter getCounter(String group, String name) {
+                return null;
+            }
 
-		private int index;
+            @Override
+            public InputSplit getInputSplit() throws UnsupportedOperationException {
+                return null;
+            }
 
-		FileScanOperator(int index) {
-			this.index = index;
-		}
+            @Override
+            public void incrCounter(Enum<?> key, long amount) {
 
-		@Override
-		public void setDataWriter(int index,
-				IOpenableDataWriter<Object[]> writer) {
-			if (index != 0) {
-				throw new IndexOutOfBoundsException("Invalid index: " + index);
-			}
-			this.writer = writer;
-		}
+            }
 
-		@Override
-		public void open() throws HyracksDataException {
-			if (splits == null) {
-				// initialize splits by reading from split file
-				initialize();
-			}
-			InputSplit split = splits[index];
-			RecordDescriptor desc = recordDescriptors[0];
-			try {
-				IRecordReader reader = createRecordReader(split, desc);
-				if (desc == null) {
-					desc = recordDescriptors[0];
-				}
-				writer.open();
-				try {
-					while (true) {
-						Object[] record = new Object[desc.getFields().length];
-						if (!reader.read(record)) {
-							break;
-						}
-						writer.writeData(record);
-					}
-				} finally {
-					reader.close();
-					writer.close();
-					splitFilePath = null;
-				}
-			} catch (Exception e) {
-				throw new HyracksDataException(e);
-			}
-		}
+            @Override
+            public void incrCounter(String group, String counter, long amount) {
 
-		@Override
-		public void close() throws HyracksDataException {
-			// do nothing
-			splitFilePath = null;
+            }
 
-		}
+            @Override
+            public void progress() {
 
-		@Override
-		public void writeData(Object[] data) throws HyracksDataException {
-			throw new UnsupportedOperationException();
-		}
-	}
+            }
 
-	protected class HDFSCustomReader implements IRecordReader {
-		private RecordReader hadoopRecordReader;
-		private Object key;
-		private Object value;
-		private FileSystem fileSystem;
+            @Override
+            public void setStatus(String status) {
 
-		public HDFSCustomReader(Map<String, String> jobConfMap,
-				InputSplit inputSplit, String inputFormatClassName,
-				Reporter reporter) {
-			try {
-				JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
-				try {
-					fileSystem = FileSystem.get(conf);
-				} catch (IOException ioe) {
-					ioe.printStackTrace();
-				}
+            }
+        };
+    }
 
-				Class inputFormatClass = Class.forName(inputFormatClassName);
-				InputFormat inputFormat = (InputFormat) ReflectionUtils
-						.newInstance(inputFormatClass, conf);
-				hadoopRecordReader = (RecordReader) inputFormat
-						.getRecordReader(inputSplit, conf, reporter);
-				Class inputKeyClass;
-				Class inputValueClass;
-				if (hadoopRecordReader instanceof SequenceFileRecordReader) {
-					inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader)
-							.getKeyClass();
-					inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader)
-							.getValueClass();
-				} else {
-					inputKeyClass = hadoopRecordReader.createKey().getClass();
-					inputValueClass = hadoopRecordReader.createValue()
-							.getClass();
-				}
-				key = inputKeyClass.newInstance();
-				value = inputValueClass.newInstance();
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-
-		@Override
-		public void close() {
-			try {
-				hadoopRecordReader.close();
-				if (fileSystem != null) {
-					fileSystem.delete(splitFilePath);
-				}
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
-		}
-
-		@Override
-		public boolean read(Object[] record) throws Exception {
-			if (!hadoopRecordReader.next(key, value)) {
-				return false;
-			}
-			if (record.length == 1) {
-				record[0] = value;
-			} else {
-				record[0] = key;
-				record[1] = value;
-			}
-			return true;
-		}
-	}
-
-	public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec)
-			throws IOException {
-		super(spec, 0, 1);
-		this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
-		InputFormat inputFormat = jobConf.getInputFormat();
-		InputSplit[] splits = inputFormat.getSplits(jobConf, jobConf
-				.getNumMapTasks());
-		RecordReader recordReader = inputFormat.getRecordReader(splits[0],
-				jobConf, createReporter());
-		super.recordDescriptors[0] = DatatypeHelper
-				.createKeyValueRecordDescriptor(
-						(Class<? extends Writable>) recordReader.createKey()
-								.getClass(),
-						(Class<? extends Writable>) recordReader.createValue()
-								.getClass());
-		String suffix = "" + System.currentTimeMillis();
-		splitFilePath = new Path(splitDirectory, suffix);
-		InputSplitHandler.writeSplitFile(splits, jobConf, splitFilePath);
-		this
-				.setPartitionConstraint(new PartitionCountConstraint(
-						splits.length));
-		this.inputFormatClassName = inputFormat.getClass().getName();
-	}
-
-	protected IRecordReader createRecordReader(InputSplit fileSplit,
-			RecordDescriptor desc) throws Exception {
-		IRecordReader recordReader = new HDFSCustomReader(jobConfMap,
-				fileSplit, inputFormatClassName, reporter);
-		return recordReader;
-	}
-
-	protected Reporter createReporter() {
-		return new Reporter() {
-			@Override
-			public Counter getCounter(Enum<?> name) {
-				return null;
-			}
-
-			@Override
-			public Counter getCounter(String group, String name) {
-				return null;
-			}
-
-			@Override
-			public InputSplit getInputSplit()
-					throws UnsupportedOperationException {
-				return null;
-			}
-
-			@Override
-			public void incrCounter(Enum<?> key, long amount) {
-
-			}
-
-			@Override
-			public void incrCounter(String group, String counter, long amount) {
-
-			}
-
-			@Override
-			public void progress() {
-
-			}
-
-			@Override
-			public void setStatus(String status) {
-
-			}
-		};
-	}
-
-	@Override
-	public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
-			IOperatorEnvironment env,
-			IRecordDescriptorProvider recordDescProvider, int partition,
-			int nPartitions) {
-		return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(
-				partition), null);
-	}
-
-}
+    @SuppressWarnings("deprecation")
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                try {
+                    JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
+                    RecordReader hadoopRecordReader;
+                    Writable key;
+                    Writable value;
+                    InputSplit[] splits = inputSplitsProxy.toInputSplits();
+                    InputSplit inputSplit = splits[partition];
+                    Class inputFormatClass = Class.forName(inputFormatClassName);
+                    InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+                    hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
+                    Class inputKeyClass;
+                    Class inputValueClass;
+                    if (hadoopRecordReader instanceof SequenceFileRecordReader) {
+                        inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
+                        inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
+                    } else {
+                        inputKeyClass = hadoopRecordReader.createKey().getClass();
+                        inputValueClass = hadoopRecordReader.createValue().getClass();
+                    }
+                    key = (Writable) inputKeyClass.newInstance();
+                    value = (Writable) inputValueClass.newInstance();
+                    ByteBuffer outBuffer = ctx.getResourceManager().allocateFrame();
+                    FrameTupleAppender appender = new FrameTupleAppender(ctx);
+                    appender.reset(outBuffer, true);
+                    RecordDescriptor outputRecordDescriptor = recordDescProvider.getOutputRecordDescriptor(
+                            getOperatorId(), 0);
+                    int nFields = outputRecordDescriptor.getFields().length;
+                    ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
+                    while (hadoopRecordReader.next(key, value)) {
+                        tb.reset();
+                        switch (nFields) {
+                            case 2:
+                                tb.addField(outputRecordDescriptor.getFields()[0], key);
+                            case 1:
+                                tb.addField(outputRecordDescriptor.getFields()[1], value);
+                        }
+                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                            FrameUtils.flushFrame(outBuffer, writer);
+                            appender.reset(outBuffer, true);
+                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                throw new IllegalStateException();
+                            }
+                        }
+                    }
+                    hadoopRecordReader.close();
+                } catch (InstantiationException e) {
+                    throw new HyracksDataException(e);
+                } catch (IllegalAccessException e) {
+                    throw new HyracksDataException(e);
+                } catch (ClassNotFoundException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java
deleted file mode 100644
index e6f5395..0000000
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Copyright 2009-2010 University of California, Irvine
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.hadoop.util;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-
-public class InputSplitHandler {
-
-	private static final int CURRENT_SPLIT_FILE_VERSION = 0;
-    private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-    
-    public static InputSplit[] getInputSplits(JobConf jobConf, Path splitFilePath) throws IOException {
-    	RawSplit [] rawSplits = RawSplit.readSplitsFile(jobConf, splitFilePath);
-    	InputSplit [] inputSplits = new InputSplit[rawSplits.length];
-    	for(int i=0;i<rawSplits.length;i++){
-    		inputSplits[i] = getInputSplit(jobConf, rawSplits[i]);
-    	}
-    	return inputSplits;
-    }
-    
-    public static void writeSplitFile(InputSplit[] inputSplits, JobConf jobConf,Path splitFilePath)
-                                             throws IOException {
-    	RawSplit.writeSplits(inputSplits ,jobConf, splitFilePath);
-    }
-    
-    private static InputSplit getInputSplit(JobConf jobConf, RawSplit rawSplit) throws IOException{
-		InputSplit inputSplit = null;
-		String splitClass = rawSplit.getClassName();
-		BytesWritable split  = rawSplit.getBytes();  
-		try {
-		     inputSplit = (InputSplit) 
-		     ReflectionUtils.newInstance(jobConf.getClassByName(splitClass), jobConf);
-		} catch (ClassNotFoundException exp) {
-		     IOException wrap = new IOException("Split class " + splitClass + 
-		                                         " not found");
-		     wrap.initCause(exp);
-		     throw wrap;
-	    }
-		DataInputBuffer splitBuffer = new DataInputBuffer();
-		splitBuffer.reset(split.getBytes(), 0, split.getLength());
-		inputSplit.readFields(splitBuffer);
-		return inputSplit;
-	}
-    
-    protected static class RawSplit implements Writable {
-	    private String splitClass;
-	    private BytesWritable bytes = new BytesWritable();
-	    private String[] locations;
-	    long dataLength;
-
-	    public void setBytes(byte[] data, int offset, int length) {
-	      bytes.set(data, offset, length);
-	    }
-
-		public void setClassName(String className) {
-		   splitClass = className;
-		}
-		      
-		public String getClassName() {
-		   return splitClass;
-		}
-		      
-		public BytesWritable getBytes() {
-		   return bytes;
-		}
-
-		public void clearBytes() {
-		    bytes = null;
-		}
-		      
-		public void setLocations(String[] locations) {
-		   this.locations = locations;
-		}
-		      
-		public String[] getLocations() {
-		   return locations;
-		}
-		      
-		public void readFields(DataInput in) throws IOException {
-		   splitClass = Text.readString(in);
-		   dataLength = in.readLong();
-		   bytes.readFields(in);
-		   int len = WritableUtils.readVInt(in);
-		   locations = new String[len];
-		   for(int i=0; i < len; ++i) {
-		       locations[i] = Text.readString(in);
-		   }
-        }
-		      
-		public void write(DataOutput out) throws IOException {
-		   Text.writeString(out, splitClass);
-		   out.writeLong(dataLength);
-		   bytes.write(out);
-		   WritableUtils.writeVInt(out, locations.length);
-		  for(int i = 0; i < locations.length; i++) {
-		     Text.writeString(out, locations[i]);
-		  }        
-		}
-
-        public long getDataLength() {
-	       return dataLength;
-	    }
-	
-        public void setDataLength(long l) {
-		    dataLength = l;
-		}
-		    
-		public static RawSplit[] readSplitsFile(JobConf conf, Path splitFilePath) throws IOException{
-		    FileSystem fs = FileSystem.get(conf);
-		   	DataInputStream splitFile =
-		    fs.open(splitFilePath);
-		    try {
-		    	byte[] header = new byte[SPLIT_FILE_HEADER.length];
-		    	splitFile.readFully(header);
-		        if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
-		          throw new IOException("Invalid header on split file");
-		        }
-		        int vers = WritableUtils.readVInt(splitFile);
-		        if (vers != CURRENT_SPLIT_FILE_VERSION) {
-		          throw new IOException("Unsupported split version " + vers);
-		        }
-		        int len = WritableUtils.readVInt(splitFile);
-		        RawSplit[] result = new RawSplit[len];
-		        for(int i=0; i < len; ++i) {
-		          result[i] = new RawSplit();
-		          result[i].readFields(splitFile);
-		        }
-		        return result;
-		    	
-		    } finally {
-		      splitFile.close();
-		    }
-		   }
-		   
-		  public static int writeSplits(InputSplit[] splits, JobConf job, 
-		            Path submitSplitFile) throws IOException {
-			// sort the splits into order based on size, so that the biggest
-			// go first
-			Arrays.sort(splits, new Comparator<InputSplit>() {
-				public int compare(InputSplit a, InputSplit b) {
-				    try {
-					   long left = a.getLength();	
-						
-					   long right = b.getLength();
-					   if (left == right) {
-						   return 0;
-					   } else if (left < right) {
-						   return 1;
-					   } else {
-						   return -1;
-					   }
-				    } catch (IOException ie) {
-				    	throw new RuntimeException("Problem getting input split size",
-				                     ie);
-				    }
-			    }
-			});
-			DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
-			try {
-				DataOutputBuffer buffer = new DataOutputBuffer();
-				RawSplit rawSplit = new RawSplit();
-				for(InputSplit split: splits) {
-					rawSplit.setClassName(split.getClass().getName());
-					buffer.reset();
-				    split.write(buffer);
-				    rawSplit.setDataLength(split.getLength());
-				    rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-				    rawSplit.setLocations(split.getLocations());
-				    rawSplit.write(out);
-				}
-			} finally {
-			out.close();
-			}
-			return splits.length;
-		}
-
-		private static DataOutputStream writeSplitsFileHeader(Configuration conf,
-		                                                      Path filename,
-		                                                      int length
-		                                                     ) throws IOException {
-		   	FileSystem fs = filename.getFileSystem(conf);
-		   	FSDataOutputStream out = fs.create(filename);
-		   	out.write(SPLIT_FILE_HEADER);
-		   	WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
-		   	WritableUtils.writeVInt(out, length);
-		   	return out;
-	    }
-	 }  
-
-}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
new file mode 100644
index 0000000..ba23c4b
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+public class InputSplitsProxy implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Class<? extends InputSplit>[] isClasses;
+    private final byte[] bytes;
+
+    public InputSplitsProxy(InputSplit[] inputSplits) throws IOException {
+        isClasses = new Class[inputSplits.length];
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        for (int i = 0; i < inputSplits.length; ++i) {
+            isClasses[i] = inputSplits[i].getClass();
+            inputSplits[i].write(dos);
+        }
+        dos.close();
+        bytes = baos.toByteArray();
+    }
+
+    public InputSplit[] toInputSplits() throws InstantiationException, IllegalAccessException, IOException {
+        InputSplit[] splits = new InputSplit[isClasses.length];
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+        for (int i = 0; i < splits.length; ++i) {
+            splits[i] = isClasses[i].newInstance();
+            splits[i].readFields(dis);
+        }
+        return splits;
+    }
+}
\ No newline at end of file