1) Writing Splits:
Splits are written to a split file, eventually persisted in HDFS. Splits are re-initialized from the split file at the server side. This protocol is being adapted as custom implementations of input formats work with custom record readers and custom input split implementations. Hence to be generic and satisfying all custom implementations, we choose to serialize the splits metadata.

2) HadoopReadOperatorDescriptor has been revised to remove any dependency over the base class. The base class is now redundant.

3) DatatypeHelper : slight modification to use Map instead of HashMap in the method hashMap2jobConf().

4) HDFSWriteOperatorDescriptor : revised with simpler constructors that do not require client to configure output splits.
/home/raman/research/work/hyracks-trunk/hyracks/hyracks/hyracks-dataflow-hadoop


git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@135 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
index e001150..a9538d0 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
@@ -14,93 +14,28 @@
  */
 package edu.uci.ics.hyracks.dataflow.hadoop;
 
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
 
-import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopFileSplit;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
-import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
 public abstract class AbstractHadoopFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
-    protected HadoopFileSplit[] splits;
-
-    public AbstractHadoopFileScanOperatorDescriptor(JobSpecification spec, HadoopFileSplit[] splits,
+    
+    public AbstractHadoopFileScanOperatorDescriptor(JobSpecification spec,
             RecordDescriptor recordDescriptor) {
         super(spec, 0, 1);
         recordDescriptors[0] = recordDescriptor;
-        this.splits = splits;
     }
 
-    protected abstract IRecordReader createRecordReader(HadoopFileSplit fileSplit, RecordDescriptor desc)
+    protected abstract IRecordReader createRecordReader(InputSplit fileSplit, RecordDescriptor desc)
             throws Exception;
 
-    protected class FileScanOperator implements IOpenableDataWriterOperator {
-        private IOpenableDataWriter<Object[]> writer;
-        private int index;
-
-        FileScanOperator(int index) {
-            this.index = index;
-        }
-
-        @Override
-        public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
-            if (index != 0) {
-                throw new IndexOutOfBoundsException("Invalid index: " + index);
-            }
-            this.writer = writer;
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-            HadoopFileSplit split = splits[index];
-            RecordDescriptor desc = recordDescriptors[0];
-            try {
-                IRecordReader reader = createRecordReader(split, desc);
-                if (desc == null) {
-                    desc = recordDescriptors[0];
-                }
-                writer.open();
-                try {
-                    while (true) {
-                        Object[] record = new Object[desc.getFields().length];
-                        if (!reader.read(record)) {
-                            break;
-                        }
-                        writer.writeData(record);
-                    }
-                } finally {
-                    reader.close();
-                    writer.close();
-                }
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            // do nothing
-        }
-
-        @Override
-        public void writeData(Object[] data) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-    }
-
+    
     protected Reporter createReporter() {
         return new Reporter() {
             @Override
@@ -140,9 +75,5 @@
         };
     }
 
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), null);
-    }
+ 
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
index 5744600..9b054e4 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.io.Text;
@@ -66,7 +67,7 @@
     public static String MAPRED_CACHE_LOCALFILES = "mapred.cache.localFiles";
 
     private static final long serialVersionUID = 1L;
-    private final HashMap<String, String> jobConfMap;
+    private final Map<String, String> jobConfMap;
     private IHadoopClassFactory hadoopClassFactory;
     
     public abstract RecordDescriptor getRecordDescriptor(JobConf jobConf);
@@ -74,7 +75,7 @@
     public AbstractHadoopOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, JobConf jobConf,
             IHadoopClassFactory hadoopOperatorFactory) {
         super(spec, 1, 1);
-        jobConfMap = DatatypeHelper.jobConf2HashMap(jobConf);
+        jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         this.hadoopClassFactory = hadoopOperatorFactory;
         if(recordDescriptor != null){
         	recordDescriptors[0]= recordDescriptor;
@@ -84,7 +85,7 @@
     }
 
      
-    public HashMap<String, String> getJobConfMap() {
+    public Map<String, String> getJobConfMap() {
 		return jobConfMap;
 	}
 
@@ -140,7 +141,7 @@
     }
 
     public JobConf getJobConf() {
-        return DatatypeHelper.hashMap2JobConf(jobConfMap);
+        return DatatypeHelper.map2JobConf(jobConfMap);
     }
     
     public void populateCache(JobConf jobConf) {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
index 1e94264..daaa5ea 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
@@ -17,15 +17,11 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,7 +30,9 @@
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
@@ -48,129 +46,152 @@
 public class HDFSWriteOperatorDescriptor extends
 		AbstractFileWriteOperatorDescriptor {
 	
-	private static String nullWritableClassName = NullWritable.class.getName();
+    private static String nullWritableClassName = NullWritable.class.getName();
 	
-	private static class HDFSLineWriterImpl extends RecordWriter {
+    private static class HDFSWriter extends RecordWriter {
 		
-        HDFSLineWriterImpl(FileSystem fileSystem, String hdfsPath, int[] columns, char separator)
-                throws Exception {
-    		super(columns,separator,new Object[]{fileSystem,hdfsPath});
-        }
+       HDFSWriter(FileSystem fileSystem, String hdfsPath, int[] columns, char separator)
+           throws Exception {
+           super(columns,separator,new Object[]{fileSystem,hdfsPath});
+       }       
+    
 
-		@Override
-		public OutputStream createOutputStream(Object[] args) throws Exception {
-			FSDataOutputStream fs = ((FileSystem)args[0]).create(new Path((String)args[1]));
-			return fs;
-		}
+       @Override
+       public OutputStream createOutputStream(Object[] args) throws Exception {
+	       FSDataOutputStream fs = ((FileSystem)args[0]).create(new Path((String)args[1]));
+	       return fs;
+       }
 
-		 @Override
-	     public void write(Object[] record) throws Exception {
-	         if(!nullWritableClassName.equals((record[0].getClass().getName()))){
-	             bufferedWriter.write(String.valueOf(record[0]));
-	         }
-	         if(!nullWritableClassName.equals((record[1].getClass().getName()))){
-	        	  bufferedWriter.write(separator);	 
-	        	  bufferedWriter.write(String.valueOf(record[1]));
-	         }	 
-	         bufferedWriter.write("\n");
-	     }
+       @Override
+       public void write(Object[] record) throws Exception {
+          if(!nullWritableClassName.equals((record[0].getClass().getName()))){
+              bufferedWriter.write(String.valueOf(record[0]));
+          }
+          if(!nullWritableClassName.equals((record[1].getClass().getName()))){
+              bufferedWriter.write(separator);	 
+              bufferedWriter.write(String.valueOf(record[1]));
+          }	 
+          bufferedWriter.write("\n");
+       }
     }
 
-	private static class HDFSSequenceWriterImpl extends RecordWriter {
-		
-		private Writer writer;
-		
-		HDFSSequenceWriterImpl(FileSystem fileSystem, String hdfsPath, Writer writer)
-                throws Exception {
-    		super(null,COMMA,new Object[]{fileSystem,hdfsPath});
-    		this.writer = writer;
+    private static class HDFSSequenceWriter extends RecordWriter {
+	private Writer writer;
+	HDFSSequenceWriter(FileSystem fileSystem, String hdfsPath, Writer writer)
+            throws Exception {
+            super(null,COMMA,new Object[]{fileSystem,hdfsPath});
+    	    this.writer = writer;
         }
 
-		@Override
-		public OutputStream createOutputStream(Object[] args) throws Exception {
-			return null;
-		}
+	@Override
+	public OutputStream createOutputStream(Object[] args) throws Exception {
+		return null;
+	}
 		
-		@Override
-	     public void close() {
-	         try {
-	             writer.close();
-	         } catch (IOException e) {
-	             e.printStackTrace();
-	         }
-	     }
+	@Override
+        public void close() {
+            try {
+                writer.close();
+            } catch (IOException e) {
+	          e.printStackTrace();
+	    }
+        }
 
-	     @Override
-	     public void write(Object[] record) throws Exception {
-	         Object key = record[0];
-	         Object value = record[1];
-	         writer.append(key, value);
-	     }
-
+        @Override
+        public void write(Object[] record) throws Exception {
+             Object key = record[0];
+             Object value = record[1];
+             writer.append(key, value);
+        }
     }
 	
     private static final long serialVersionUID = 1L;
     private static final char COMMA = ',';
-	private char separator;
-	private boolean sequenceFileOutput = false;
-	private String keyClassName;
-	private String valueClassName;
-	Map<String,String> jobConfMap;
+    private char separator;
+    private boolean sequenceFileOutput = false;
+    Map<String,String> jobConfMap;
     
 
     @Override
     protected IRecordWriter createRecordWriter(File file,int index) throws Exception {
-    	JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap)jobConfMap);
-		System.out.println("replication:" + conf.get("dfs.replication"));
+    	JobConf conf = DatatypeHelper.map2JobConf((HashMap)jobConfMap);
+	System.out.println("replication:" + conf.get("dfs.replication"));
     	FileSystem fileSystem = null;
-		try{
-			fileSystem = FileSystem.get(conf);
-		}catch(IOException ioe){
-			ioe.printStackTrace();
-		}
-		Path path = new Path(file.getAbsolutePath());
-		checkIfCanWriteToHDFS(new FileSplit[]{new FileSplit("localhost",file)});
-		FSDataOutputStream outputStream = fileSystem.create(path);
-		outputStream.close();
-		if(sequenceFileOutput){
-			Class  keyClass = Class.forName(keyClassName);  
-			Class valueClass= Class.forName(valueClassName);
-			conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
-			Writer writer = SequenceFile.createWriter(fileSystem, conf,path, keyClass, valueClass);
-			return new HDFSSequenceWriterImpl(fileSystem, file.getAbsolutePath(), writer);
-		}else{
-			return new HDFSLineWriterImpl(fileSystem, file.getAbsolutePath(), null, COMMA);
-		}	
+     	try {
+	    fileSystem = FileSystem.get(conf);
+	} catch (IOException ioe) {
+            ioe.printStackTrace();
+	}
+	Path path = new Path(file.getAbsolutePath());
+	checkIfCanWriteToHDFS(new FileSplit[]{new FileSplit("localhost",file)});
+	FSDataOutputStream outputStream = fileSystem.create(path);
+	outputStream.close();
+	if(sequenceFileOutput) {
+            Class keyClass = Class.forName(conf.getOutputKeyClass().getName());  
+	    Class valueClass= Class.forName(conf.getOutputValueClass().getName());
+            conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
+   	    Writer writer = SequenceFile.createWriter(fileSystem, conf,path, keyClass, valueClass);
+	    return new HDFSSequenceWriter(fileSystem, file.getAbsolutePath(), writer);
+        } else {
+	    return new HDFSWriter(fileSystem, file.getAbsolutePath(), null, COMMA);
+        }	
     }
     
     private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception{
     	boolean canWrite = true;
-    	JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap)jobConfMap);
-		FileSystem fileSystem = null;
-		try{
-			fileSystem = FileSystem.get(conf);
-		    for(FileSplit fileSplit : fileSplits){
-				Path path = new Path(fileSplit.getLocalFile().getAbsolutePath());
-				canWrite = !fileSystem.exists(path);
-				if(!canWrite){
-					throw new Exception(" Output path :  already exists");
-				}	
-			}
-		}catch(IOException ioe){
-			ioe.printStackTrace();
-			throw ioe;
-		}
-	    return canWrite;
-    }
-		
-	
-	public HDFSWriteOperatorDescriptor(JobSpecification jobSpec,Map<String,String> jobConfMap, FileSplit[] fileSplits,char seperator,boolean sequenceFileOutput,String keyClassName, String valueClassName) throws Exception{
-		super(jobSpec,fileSplits);
-		this.jobConfMap = jobConfMap;
-		checkIfCanWriteToHDFS(fileSplits);
-		this.separator = seperator;
-		this.sequenceFileOutput = sequenceFileOutput;
-		this.keyClassName = keyClassName;
-		this.valueClassName = valueClassName;
+    	JobConf conf = DatatypeHelper.map2JobConf((HashMap)jobConfMap);
+	FileSystem fileSystem = null;
+	try {
+	    fileSystem = FileSystem.get(conf);
+	    for(FileSplit fileSplit : fileSplits) {
+		Path path = new Path(fileSplit.getLocalFile().getAbsolutePath());
+		canWrite = !fileSystem.exists(path);
+		if(!canWrite){
+	            throw new Exception(" Output path :  already exists");
+		}	
+            }
+	} catch(IOException ioe) {
+	    ioe.printStackTrace();
+	    throw ioe;
 	}
+        return canWrite;
+   }    
+
+    private static String getAbsolutePath(Path path) {
+          StringBuffer absolutePath = new StringBuffer();
+          List<String> ancestorPath = new ArrayList<String>();
+          Path pathElement=path;
+          while(pathElement != null) {
+                ancestorPath.add(0, pathElement.getName());
+             pathElement = pathElement.getParent();
+          }
+          ancestorPath.remove(0);        
+          for(String s : ancestorPath) {
+              absolutePath.append("/");
+              absolutePath.append(s);
+          }
+          return new String(absolutePath);
+     }
+
+     private static FileSplit[] getOutputSplits(JobConf conf,int noOfMappers){
+         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
+         FileSplit[] outputFileSplits = new FileSplit[numOutputters];
+         String absolutePath = getAbsolutePath(FileOutputFormat.getOutputPath(conf));
+         for(int index=0;index<numOutputters;index++) {
+             String suffix = new String("part-00000");
+             suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+             suffix = suffix + index;
+             String outputPath = absolutePath + "/" + suffix;
+             outputFileSplits[index] = new FileSplit("localhost",new File(outputPath));
+         }
+         return outputFileSplits;
+    }
+
+    public HDFSWriteOperatorDescriptor(JobSpecification jobSpec,JobConf jobConf, int numMapTasks) throws Exception{
+        super(jobSpec,getOutputSplits(jobConf,numMapTasks));
+        this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
+        checkIfCanWriteToHDFS(super.splits);
+	    this.sequenceFileOutput = 
+	    (jobConf.getOutputFormat() instanceof SequenceFileOutputFormat);
+    }
 }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index e7b2714..e7fd9c1 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2010 University of California, Irvine
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -15,132 +15,271 @@
 package edu.uci.ics.hyracks.dataflow.hadoop;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.HadoopFileSplit;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitHandler;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
+import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
-public class HadoopReadOperatorDescriptor extends AbstractHadoopFileScanOperatorDescriptor {
+public class HadoopReadOperatorDescriptor extends
+		AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
-    private String inputFormatClassName;
-    private Map<String, String> jobConfMap;
+	protected transient InputSplit[] splits;
+	protected transient Path splitFilePath;
+	protected transient JobConf jobConf;
+	protected transient Reporter reporter;
 
-    private static class HDFSCustomReader implements IRecordReader {
-        private RecordReader hadoopRecordReader;
-        private Class inputKeyClass;
-        private Class inputValueClass;
-        private Object key;
-        private Object value;
+	private static final long serialVersionUID = 1L;
+	private String inputFormatClassName;
+	private Map<String, String> jobConfMap;
+	private static String splitDirectory = "/tmp/splits";
 
-        public HDFSCustomReader(Map<String, String> jobConfMap, HadoopFileSplit inputSplit,
-                String inputFormatClassName, Reporter reporter) {
-            try {
-                JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap) jobConfMap);
-                FileSystem fileSystem = null;
-                try {
-                    fileSystem = FileSystem.get(conf);
-                } catch (IOException ioe) {
-                    ioe.printStackTrace();
-                }
+	private void initialize() {
+		try {
+			reporter = createReporter();
+			if (jobConf == null) {
+				jobConf = DatatypeHelper.map2JobConf(jobConfMap);
+			}
+			splits = InputSplitHandler.getInputSplits(jobConf, splitFilePath);
+		} catch (IOException ioe) {
+			ioe.printStackTrace();
+		}
+	}
 
-                Class inputFormatClass = Class.forName(inputFormatClassName);
-                InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-                hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(getFileSplit(inputSplit), conf,
-                        reporter);
-                if (hadoopRecordReader instanceof SequenceFileRecordReader) {
-                    inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
-                    inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
-                } else {
-                    inputKeyClass = hadoopRecordReader.createKey().getClass();
-                    inputValueClass = hadoopRecordReader.createValue().getClass();
-                }
-                key = inputKeyClass.newInstance();
-                value = inputValueClass.newInstance();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
+	protected class FileScanOperator implements IOpenableDataWriterOperator {
+		private IOpenableDataWriter<Object[]> writer;
 
-        public Class getInputKeyClass() {
-            return inputKeyClass;
-        }
+		private int index;
 
-        public void setInputKeyClass(Class inputKeyClass) {
-            this.inputKeyClass = inputKeyClass;
-        }
+		FileScanOperator(int index) {
+			this.index = index;
+		}
 
-        public Class getInputValueClass() {
-            return inputValueClass;
-        }
+		@Override
+		public void setDataWriter(int index,
+				IOpenableDataWriter<Object[]> writer) {
+			if (index != 0) {
+				throw new IndexOutOfBoundsException("Invalid index: " + index);
+			}
+			this.writer = writer;
+		}
 
-        public void setInputValueClass(Class inputValueClass) {
-            this.inputValueClass = inputValueClass;
-        }
+		@Override
+		public void open() throws HyracksDataException {
+			if (splits == null) {
+				// initialize splits by reading from split file
+				initialize();
+			}
+			InputSplit split = splits[index];
+			RecordDescriptor desc = recordDescriptors[0];
+			try {
+				IRecordReader reader = createRecordReader(split, desc);
+				if (desc == null) {
+					desc = recordDescriptors[0];
+				}
+				writer.open();
+				try {
+					while (true) {
+						Object[] record = new Object[desc.getFields().length];
+						if (!reader.read(record)) {
+							break;
+						}
+						writer.writeData(record);
+					}
+				} finally {
+					reader.close();
+					writer.close();
+					splitFilePath = null;
+				}
+			} catch (Exception e) {
+				throw new HyracksDataException(e);
+			}
+		}
 
-        @Override
-        public void close() {
-            try {
-                hadoopRecordReader.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
+		@Override
+		public void close() throws HyracksDataException {
+			// do nothing
+			splitFilePath = null;
 
-        @Override
-        public boolean read(Object[] record) throws Exception {
-            if (!hadoopRecordReader.next(key, value)) {
-                return false;
-            }
-            if (record.length == 1) {
-                record[0] = value;
-            } else {
-                record[0] = key;
-                record[1] = value;
-            }
-            return true;
-        }
+		}
 
-        private FileSplit getFileSplit(HadoopFileSplit hadoopFileSplit) {
-            FileSplit fileSplit = new FileSplit(new Path(hadoopFileSplit.getFile()), hadoopFileSplit.getStart(),
-                    hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
-            return fileSplit;
-        }
-    }
+		@Override
+		public void writeData(Object[] data) throws HyracksDataException {
+			throw new UnsupportedOperationException();
+		}
+	}
 
-    public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, JobSpecification spec,
-            HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
-        super(spec, splits, recordDescriptor);
-        this.inputFormatClassName = inputFormatClassName;
-        this.jobConfMap = jobConfMap;
-    }
+	protected class HDFSCustomReader implements IRecordReader {
+		private RecordReader hadoopRecordReader;
+		private Object key;
+		private Object value;
+		private FileSystem fileSystem;
 
-    public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, InetSocketAddress nameNode,
-            JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
-        super(spec, null, recordDescriptor);
-        this.inputFormatClassName = inputFormatClassName;
-        this.jobConfMap = jobConfMap;
-    }
+		public HDFSCustomReader(Map<String, String> jobConfMap,
+				InputSplit inputSplit, String inputFormatClassName,
+				Reporter reporter) {
+			try {
+				JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
+				try {
+					fileSystem = FileSystem.get(conf);
+				} catch (IOException ioe) {
+					ioe.printStackTrace();
+				}
 
-    @Override
-    protected IRecordReader createRecordReader(HadoopFileSplit fileSplit, RecordDescriptor desc) throws Exception {
-        Reporter reporter = createReporter();
-        IRecordReader recordReader = new HDFSCustomReader(jobConfMap, fileSplit, inputFormatClassName, reporter);
-        return recordReader;
-    }
-}
\ No newline at end of file
+				Class inputFormatClass = Class.forName(inputFormatClassName);
+				InputFormat inputFormat = (InputFormat) ReflectionUtils
+						.newInstance(inputFormatClass, conf);
+				hadoopRecordReader = (RecordReader) inputFormat
+						.getRecordReader(inputSplit, conf, reporter);
+				Class inputKeyClass;
+				Class inputValueClass;
+				if (hadoopRecordReader instanceof SequenceFileRecordReader) {
+					inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader)
+							.getKeyClass();
+					inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader)
+							.getValueClass();
+				} else {
+					inputKeyClass = hadoopRecordReader.createKey().getClass();
+					inputValueClass = hadoopRecordReader.createValue()
+							.getClass();
+				}
+				key = inputKeyClass.newInstance();
+				value = inputValueClass.newInstance();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		public void close() {
+			try {
+				hadoopRecordReader.close();
+				if (fileSystem != null) {
+					fileSystem.delete(splitFilePath);
+				}
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		public boolean read(Object[] record) throws Exception {
+			if (!hadoopRecordReader.next(key, value)) {
+				return false;
+			}
+			if (record.length == 1) {
+				record[0] = value;
+			} else {
+				record[0] = key;
+				record[1] = value;
+			}
+			return true;
+		}
+	}
+
+	public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec)
+			throws IOException {
+		super(spec, 0, 1);
+		this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
+		InputFormat inputFormat = jobConf.getInputFormat();
+		InputSplit[] splits = inputFormat.getSplits(jobConf, jobConf
+				.getNumMapTasks());
+		RecordReader recordReader = inputFormat.getRecordReader(splits[0],
+				jobConf, createReporter());
+		super.recordDescriptors[0] = DatatypeHelper
+				.createKeyValueRecordDescriptor(
+						(Class<? extends Writable>) recordReader.createKey()
+								.getClass(),
+						(Class<? extends Writable>) recordReader.createValue()
+								.getClass());
+		String suffix = "" + System.currentTimeMillis();
+		splitFilePath = new Path(splitDirectory, suffix);
+		InputSplitHandler.writeSplitFile(splits, jobConf, splitFilePath);
+		this
+				.setPartitionConstraint(new PartitionCountConstraint(
+						splits.length));
+		this.inputFormatClassName = inputFormat.getClass().getName();
+	}
+
+	protected IRecordReader createRecordReader(InputSplit fileSplit,
+			RecordDescriptor desc) throws Exception {
+		IRecordReader recordReader = new HDFSCustomReader(jobConfMap,
+				fileSplit, inputFormatClassName, reporter);
+		return recordReader;
+	}
+
+	protected Reporter createReporter() {
+		return new Reporter() {
+			@Override
+			public Counter getCounter(Enum<?> name) {
+				return null;
+			}
+
+			@Override
+			public Counter getCounter(String group, String name) {
+				return null;
+			}
+
+			@Override
+			public InputSplit getInputSplit()
+					throws UnsupportedOperationException {
+				return null;
+			}
+
+			@Override
+			public void incrCounter(Enum<?> key, long amount) {
+
+			}
+
+			@Override
+			public void incrCounter(String group, String counter, long amount) {
+
+			}
+
+			@Override
+			public void progress() {
+
+			}
+
+			@Override
+			public void setStatus(String status) {
+
+			}
+		};
+	}
+
+	@Override
+	public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
+			IOperatorEnvironment env,
+			IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) {
+		return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(
+				partition), null);
+	}
+
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
index 7eea853..23f7f6e 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
@@ -18,6 +18,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
@@ -96,7 +97,7 @@
         return new RecordDescriptor(fields);
     }
 
-    public static JobConf hashMap2JobConf(HashMap<String, String> jobConfMap) {
+    public static JobConf map2JobConf(Map<String, String> jobConfMap) {
         JobConf jobConf;
         synchronized (Configuration.class) {
             jobConf = new JobConf();
@@ -107,8 +108,8 @@
         return jobConf;
     }
 
-    public static HashMap<String, String> jobConf2HashMap(JobConf jobConf) {
-        HashMap<String, String> jobConfMap = new HashMap<String, String>();
+    public static Map<String, String> jobConf2Map(JobConf jobConf) {
+        Map<String, String> jobConfMap = new HashMap<String, String>();
         for (Entry<String, String> entry : jobConf) {
             jobConfMap.put(entry.getKey(), entry.getValue());
         }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java
deleted file mode 100644
index 0c984bc..0000000
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopAdapter.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.hadoop.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
-
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-
-public class HadoopAdapter {
-
-	private static ClientProtocol namenode;
-	private static FileSystem fileSystem;
-	private static JobConf jobConf;
-	private static HadoopAdapter instance;
-	
-	public static final String DFS_DATA_DIR = "dfs.data.dir";
-	public static final String FS_DEFAULT_NAME = "fs.default.name";
-	public static final String DFS_REPLICATION = "dfs.replication";
-	
-	public static HadoopAdapter getInstance(String fileSystemURL){
-		if(instance == null){
-			jobConf = new JobConf(true);
-			String [] urlParts = parseURL(fileSystemURL);
-			jobConf.set(FS_DEFAULT_NAME, fileSystemURL);
-			instance = new HadoopAdapter(new InetSocketAddress(urlParts[1], Integer.parseInt(urlParts[2])));
-		}
-		return instance;
-	}
-	
-	public static JobConf getConf() {
-		return jobConf;
-	}
-
-	private HadoopAdapter (InetSocketAddress address){
-		try{
-			this.namenode = getNameNode(address);
-			fileSystem = FileSystem.get(jobConf);
-		}catch(IOException ioe){
-			ioe.printStackTrace();
-		}
-	}
-	
-	private static String[] parseURL(String urlString){
-		String[] urlComponents = urlString.split(":");
-		urlComponents[1] = urlComponents[1].substring(2);
-		return urlComponents;
-	}
-	
-	
-	public Map<String,List<HadoopFileSplit>> getInputSplits(String[] inputPaths){
-	    List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
-    	Path[] paths = new Path[inputPaths.length];
-    	int index =0;
-    	for(String inputPath : inputPaths){
-    		paths[index++] = new Path(StringUtils.unEscapeString(inputPaths[0]));
-    	}
-    	Map<String,List<HadoopFileSplit>> fileSplitInfo = getBlocksToRead(paths);
-    	return fileSplitInfo;
-	}
-	
-	private static Map<String,List<HadoopFileSplit>> getBlocksToRead(Path[] inputPaths){
-		Map<String,List<HadoopFileSplit>> hadoopFileSplitMap = new HashMap<String,List<HadoopFileSplit>>();
-		for (int i=0;i<inputPaths.length;i++){
-			try{
-				String absolutePathPrint = getAbsolutePath(inputPaths[i]);
-				FileStatus[] fileStatuses = namenode.getListing(absolutePathPrint);
-				for(int j=0;j<fileStatuses.length;j++){
-			    	Path filePath = fileStatuses[j].getPath();
-			    	String absolutePath = getAbsolutePath(filePath);
-			    	List<HadoopFileSplit> fileSplits = getFileBlocks(absolutePath,fileStatuses[j]);
-			    	if(fileSplits!=null && fileSplits.size() > 0){
-			    		hadoopFileSplitMap.put(absolutePath, fileSplits);
-			    	}	
-			    }		
-			   }catch(IOException ioe){
-				ioe.printStackTrace();
-		    }
-			
-		}
-		return hadoopFileSplitMap;
-	}
-	
-	private static ClientProtocol getNameNode(InetSocketAddress address) throws IOException{
-		return (ClientProtocol)getProtocol(ClientProtocol.class, address, new JobConf());
-	}
-	
-	private static String getAbsolutePath(Path path){
-		StringBuffer absolutePath = new StringBuffer();
-		List<String> ancestorPath = new ArrayList<String>();
-		Path pathElement=path;
-		while(pathElement != null){
-			ancestorPath.add(0, pathElement.getName());
-			pathElement = pathElement.getParent();
-		}
-		ancestorPath.remove(0);
-		for(String s : ancestorPath){
-			absolutePath.append("/");
-			absolutePath.append(s);
-		}
-		return new String(absolutePath);
-	}
-	
-	private static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf) throws IOException{
-		VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress, jobConf);	
-		return versionedProtocol;
-	}
-	
-	private static List<HadoopFileSplit> getFileBlocks(String absolutePath,FileStatus fileStatus){
-		List<HadoopFileSplit> hadoopFileSplits = new ArrayList<HadoopFileSplit>();
-		try{
-			LocatedBlocks locatedBlocks = namenode.getBlockLocations(absolutePath, 0, fileStatus.getLen());
-			long blockSize = fileSystem.getBlockSize(new Path(absolutePath));
-			if(locatedBlocks !=null){
-	    		int index = 0;
-				for(LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()){
-					DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); // all datanodes having this block
-					String [] hostnames = new String[datanodeInfos.length];
-					int datanodeCount =0;
-					for(DatanodeInfo datanodeInfo : datanodeInfos){
-						hostnames[datanodeCount++] = datanodeInfo.getHostName();
-					}	
-					HadoopFileSplit hadoopFileSplit = new HadoopFileSplit(absolutePath,new Long(index * blockSize).longValue(),new Long(blockSize).longValue(),hostnames);
-			    	hadoopFileSplits.add(hadoopFileSplit);
-			    	index++;
-			    	}
-	    	}	
-		}catch(Exception e){
-			e.printStackTrace();
-		}
-		return hadoopFileSplits;
-	}
-}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java
deleted file mode 100644
index 6e7b76c..0000000
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/HadoopFileSplit.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.hadoop.util;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.fs.Path;
-
-public class HadoopFileSplit implements Serializable{
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = 1L;
-	private String filePath;
-    private long start;
-	private long length;
-    private String[] hosts;
-    
-    public HadoopFileSplit(String filePath, long start, long length, String[] hosts){
-    	this.filePath = filePath;
-    	this.start = start;
-    	this.length = length;
-    	this.hosts = hosts;
-    }
-
-	public String getFile() {
-		return filePath;
-	}
-
-	public void setFile(String file) {
-		this.filePath = file;
-	}
-
-	public long getStart() {
-		return start;
-	}
-
-	public void setStart(long start) {
-		this.start = start;
-	}
-
-	public long getLength() {
-		return length;
-	}
-
-	public void setLength(long length) {
-		this.length = length;
-	}
-
-	public String[] getHosts() {
-		return hosts;
-	}
-
-	public void setHosts(String[] hosts) {
-		this.hosts = hosts;
-	}
-	
-	public String toString(){
-		StringBuilder stringBuilder = new StringBuilder();
-		stringBuilder.append(filePath + " " + start + " " + length +  "\n");
-		for(String host : hosts){
-			stringBuilder.append(host);
-			stringBuilder.append(",");
-		}
-		return new String(stringBuilder);
-	}
-}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java
new file mode 100644
index 0000000..e6f5395
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+
+public class InputSplitHandler {
+
+	private static final int CURRENT_SPLIT_FILE_VERSION = 0;
+    private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
+    
+    public static InputSplit[] getInputSplits(JobConf jobConf, Path splitFilePath) throws IOException {
+    	RawSplit [] rawSplits = RawSplit.readSplitsFile(jobConf, splitFilePath);
+    	InputSplit [] inputSplits = new InputSplit[rawSplits.length];
+    	for(int i=0;i<rawSplits.length;i++){
+    		inputSplits[i] = getInputSplit(jobConf, rawSplits[i]);
+    	}
+    	return inputSplits;
+    }
+    
+    public static void writeSplitFile(InputSplit[] inputSplits, JobConf jobConf,Path splitFilePath)
+                                             throws IOException {
+    	RawSplit.writeSplits(inputSplits ,jobConf, splitFilePath);
+    }
+    
+    private static InputSplit getInputSplit(JobConf jobConf, RawSplit rawSplit) throws IOException{
+		InputSplit inputSplit = null;
+		String splitClass = rawSplit.getClassName();
+		BytesWritable split  = rawSplit.getBytes();  
+		try {
+		     inputSplit = (InputSplit) 
+		     ReflectionUtils.newInstance(jobConf.getClassByName(splitClass), jobConf);
+		} catch (ClassNotFoundException exp) {
+		     IOException wrap = new IOException("Split class " + splitClass + 
+		                                         " not found");
+		     wrap.initCause(exp);
+		     throw wrap;
+	    }
+		DataInputBuffer splitBuffer = new DataInputBuffer();
+		splitBuffer.reset(split.getBytes(), 0, split.getLength());
+		inputSplit.readFields(splitBuffer);
+		return inputSplit;
+	}
+    
+    protected static class RawSplit implements Writable {
+	    private String splitClass;
+	    private BytesWritable bytes = new BytesWritable();
+	    private String[] locations;
+	    long dataLength;
+
+	    public void setBytes(byte[] data, int offset, int length) {
+	      bytes.set(data, offset, length);
+	    }
+
+		public void setClassName(String className) {
+		   splitClass = className;
+		}
+		      
+		public String getClassName() {
+		   return splitClass;
+		}
+		      
+		public BytesWritable getBytes() {
+		   return bytes;
+		}
+
+		public void clearBytes() {
+		    bytes = null;
+		}
+		      
+		public void setLocations(String[] locations) {
+		   this.locations = locations;
+		}
+		      
+		public String[] getLocations() {
+		   return locations;
+		}
+		      
+		public void readFields(DataInput in) throws IOException {
+		   splitClass = Text.readString(in);
+		   dataLength = in.readLong();
+		   bytes.readFields(in);
+		   int len = WritableUtils.readVInt(in);
+		   locations = new String[len];
+		   for(int i=0; i < len; ++i) {
+		       locations[i] = Text.readString(in);
+		   }
+        }
+		      
+		public void write(DataOutput out) throws IOException {
+		   Text.writeString(out, splitClass);
+		   out.writeLong(dataLength);
+		   bytes.write(out);
+		   WritableUtils.writeVInt(out, locations.length);
+		  for(int i = 0; i < locations.length; i++) {
+		     Text.writeString(out, locations[i]);
+		  }        
+		}
+
+        public long getDataLength() {
+	       return dataLength;
+	    }
+	
+        public void setDataLength(long l) {
+		    dataLength = l;
+		}
+		    
+		public static RawSplit[] readSplitsFile(JobConf conf, Path splitFilePath) throws IOException{
+		    FileSystem fs = FileSystem.get(conf);
+		   	DataInputStream splitFile =
+		    fs.open(splitFilePath);
+		    try {
+		    	byte[] header = new byte[SPLIT_FILE_HEADER.length];
+		    	splitFile.readFully(header);
+		        if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
+		          throw new IOException("Invalid header on split file");
+		        }
+		        int vers = WritableUtils.readVInt(splitFile);
+		        if (vers != CURRENT_SPLIT_FILE_VERSION) {
+		          throw new IOException("Unsupported split version " + vers);
+		        }
+		        int len = WritableUtils.readVInt(splitFile);
+		        RawSplit[] result = new RawSplit[len];
+		        for(int i=0; i < len; ++i) {
+		          result[i] = new RawSplit();
+		          result[i].readFields(splitFile);
+		        }
+		        return result;
+		    	
+		    } finally {
+		      splitFile.close();
+		    }
+		   }
+		   
+		  public static int writeSplits(InputSplit[] splits, JobConf job, 
+		            Path submitSplitFile) throws IOException {
+			// sort the splits into order based on size, so that the biggest
+			// go first
+			Arrays.sort(splits, new Comparator<InputSplit>() {
+				public int compare(InputSplit a, InputSplit b) {
+				    try {
+					   long left = a.getLength();	
+						
+					   long right = b.getLength();
+					   if (left == right) {
+						   return 0;
+					   } else if (left < right) {
+						   return 1;
+					   } else {
+						   return -1;
+					   }
+				    } catch (IOException ie) {
+				    	throw new RuntimeException("Problem getting input split size",
+				                     ie);
+				    }
+			    }
+			});
+			DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
+			try {
+				DataOutputBuffer buffer = new DataOutputBuffer();
+				RawSplit rawSplit = new RawSplit();
+				for(InputSplit split: splits) {
+					rawSplit.setClassName(split.getClass().getName());
+					buffer.reset();
+				    split.write(buffer);
+				    rawSplit.setDataLength(split.getLength());
+				    rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+				    rawSplit.setLocations(split.getLocations());
+				    rawSplit.write(out);
+				}
+			} finally {
+			out.close();
+			}
+			return splits.length;
+		}
+
+		private static DataOutputStream writeSplitsFileHeader(Configuration conf,
+		                                                      Path filename,
+		                                                      int length
+		                                                     ) throws IOException {
+		   	FileSystem fs = filename.getFileSystem(conf);
+		   	FSDataOutputStream out = fs.create(filename);
+		   	out.write(SPLIT_FILE_HEADER);
+		   	WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
+		   	WritableUtils.writeVInt(out, length);
+		   	return out;
+	    }
+	 }  
+
+}