Hadoop Operators currently do not support 'org.apache.hadoop.mapreduce.*' types and hence cannot run MR jobs referencing those types. In order to be compatible, we need to support them. 
> This change adds suport for mapreduce libraries. The changes are spread across all Hadoop operators. The compatibilty layer also changes in order to support the mapreduce package

git-svn-id: https://hyracks.googlecode.com/svn/trunk@184 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index bbe8689..08df437 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -15,24 +15,23 @@
 package edu.uci.ics.hyracks.dataflow.hadoop;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
 
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -41,9 +40,7 @@
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
@@ -147,11 +144,15 @@
                 if (inputSplits == null) {
                     inputSplits = inputSplitsProxy.toInputSplits(conf);
                 }
-                InputSplit splitRead = inputSplits[partition];
+                Object splitRead = inputSplits[partition];
                 if (splitRead instanceof FileSplit) {
                     conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
                     conf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
                     conf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
+                } else if ( splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+                    conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath().toString());
+                    conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
+                    conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -161,44 +162,52 @@
             }
         }
 
-        public void mapInput() throws HyracksDataException {
+        public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
             try {
                 Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                 initializeMapper();
                 updateConfWithSplit(conf);
                 mapper.configure(conf);
                 conf.setClassLoader(this.getClass().getClassLoader());
-                RecordReader hadoopRecordReader;
-                Object key;
-                Object value;
-                InputSplit inputSplit = inputSplits[partition];
-                Class inputFormatClass = conf.getInputFormat().getClass();
-                InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-                hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
-                Class inputKeyClass;
-                Class inputValueClass;
-                if (hadoopRecordReader instanceof SequenceFileRecordReader) {
-                    inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
-                    inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
-                } else {
-                    inputKeyClass = hadoopRecordReader.createKey().getClass();
-                    inputValueClass = hadoopRecordReader.createValue().getClass();
-                }
-
-                key = hadoopRecordReader.createKey();
-                value = hadoopRecordReader.createValue();
-                RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
-                        (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
-                int nFields = outputRecordDescriptor.getFields().length;
-                ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
+                Object reader;
+                Object key = null;
+                Object value = null;
+                Object inputSplit = inputSplits[partition];
+                reader = getRecordReader(conf, inputSplit);
+                Class inputKeyClass = null;
+                Class inputValueClass = null;
                 Object[] data = new Object[2];
-                while (hadoopRecordReader.next(key, value)) {
-                    data[0] = key;
-                    data[1] = value;
-                    super.map(data);
+                  
+                if(conf.getUseNewMapper()){
+                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader)reader;
+                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit)inputSplit, new TaskAttemptContext(conf, new TaskAttemptID()));
+                    while(newReader.nextKeyValue()){
+                        data[0] = newReader.getCurrentKey();
+                        if(data[0] == null) {
+                            data[0] = NullWritable.get();
+                        }
+                        data[1] = newReader.getCurrentValue();
+                        super.map(data);
+                    }
+                } else {
+                    RecordReader oldReader = (RecordReader)reader;
+                    if (reader instanceof SequenceFileRecordReader) {
+                        inputKeyClass = ((SequenceFileRecordReader) oldReader).getKeyClass();
+                        inputValueClass = ((SequenceFileRecordReader) oldReader).getValueClass();
+                    } else {
+                        inputKeyClass = oldReader.createKey().getClass();
+                        inputValueClass = oldReader.createValue().getClass();
+                    }
+                    key = oldReader.createKey();
+                    value = oldReader.createValue();
+                    while (oldReader.next(key, value)) {
+                        data[0] = key;
+                        data[1] = value;
+                        super.map(data);
+                    }
+                    oldReader.close();
                 }
-                hadoopRecordReader.close();
+                
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -214,13 +223,13 @@
     private static final long serialVersionUID = 1L;
     private Class<? extends Mapper> mapperClass;
     private InputSplitsProxy inputSplitsProxy;
-    private transient InputSplit[] inputSplits;
+    private transient Object[] inputSplits;
     private boolean selfRead = false;
 
-    private void initializeSplitInfo(InputSplit[] splits) throws IOException {
+    private void initializeSplitInfo(Object[] splits) throws IOException {
         jobConf = super.getJobConf();
         InputFormat inputFormat = jobConf.getInputFormat();
-        inputSplitsProxy = new InputSplitsProxy(splits);
+        inputSplitsProxy = new InputSplitsProxy(jobConf,splits);
     }
 
     public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory)
@@ -228,7 +237,7 @@
         super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
     }
 
-    public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, InputSplit[] splits,
+    public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, Object[] splits,
             IHadoopClassFactory hadoopClassFactory) throws IOException {
         super(spec, 0, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
         initializeSplitInfo(splits);
@@ -266,6 +275,19 @@
         }
     }
 
+    private Object getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException, InterruptedException {
+        if(conf.getUseNewMapper()){
+            JobContext context = new JobContext(conf,null);
+            org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(context.getInputFormatClass(), conf);
+            TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(jobConf,new TaskAttemptID());
+            return inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit)inputSplit,taskAttemptContext);
+        } else {
+            Class inputFormatClass = conf.getInputFormat().getClass();
+            InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit)inputSplit, conf, super.createReporter());
+        }
+    }
+    
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
@@ -278,10 +300,23 @@
                 if (inputSplits == null) {
                     inputSplits = inputSplitsProxy.toInputSplits(conf);
                 }
-                RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[partition], conf,
-                        super.createReporter());
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) reader
-                        .createKey().getClass(), (Class<? extends Writable>) reader.createValue().getClass());
+                Object reader = getRecordReader(conf,inputSplits[partition]);
+                if(conf.getUseNewMapper()) {
+                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader)reader;
+                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit)inputSplits[partition], new TaskAttemptContext(conf, new TaskAttemptID()));
+                    newReader.nextKeyValue();
+                    Object key = newReader.getCurrentKey();
+                    Class keyClass = null;
+                    if (key == null) {
+                        keyClass = Class.forName("org.apache.hadoop.io.NullWritable");
+                    }
+                    recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) keyClass,
+                            (Class<? extends Writable>) newReader.getCurrentValue().getClass());
+                } else {
+                RecordReader oldReader = (RecordReader)reader;    
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) oldReader
+                        .createKey().getClass(), (Class<? extends Writable>) oldReader.createValue().getClass());
+                }
                 return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
             } else {
                 return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
@@ -299,7 +334,11 @@
             public void initialize() throws HyracksDataException {
                 SerializingDataWriter writer = new SerializingDataWriter(ctx, recordDescriptor, this.writer);
                 ReaderMapperOperator readMapOp = new ReaderMapperOperator(partition, writer);
-                readMapOp.mapInput();
+                try {
+                    readMapOp.mapInput();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                } 
                 readMapOp.close();
             }
         };
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 428bafe..93ef1ec 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -27,6 +27,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
@@ -60,35 +62,40 @@
         return jobConf;
     }
 
-    public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, InputSplit[] splits) throws IOException {
+    public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, Object[] splits) throws IOException {
         super(spec, 0, 1);
         this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         InputFormat inputFormat = jobConf.getInputFormat();
-        RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, createReporter());
+        RecordReader recordReader;
+        try {
+            recordReader = getRecordReader(DatatypeHelper.map2JobConf(jobConfMap), splits[0]);
+        } catch (Exception e) {
+            throw new IOException(e);
+        } 
         recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
                 .createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
         this.setPartitionConstraint(new PartitionCountConstraint(splits.length));
-        inputSplitsProxy = new InputSplitsProxy(splits);
+        inputSplitsProxy = new InputSplitsProxy(jobConf,splits);
         this.inputFormatClassName = inputFormat.getClass().getName();
-        try {
-            checkSplits(jobConf);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
     }
 
-    public InputSplit[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException {
-        return inputSplitsProxy.toInputSplits(getJobConf());
-    }
-
-    private void checkSplits(JobConf conf) throws Exception {
-        InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
-        for (InputSplit inputSplit : splits) {
-            Class inputFormatClass = Class.forName(inputFormatClassName);
+    private RecordReader getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException, InterruptedException {
+        RecordReader hadoopRecordReader = null;
+        if(conf.getUseNewMapper()){
+            JobContext context = new JobContext(conf,null);
+            org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(context.getInputFormatClass(), conf);
+            TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(jobConf, null);
+            hadoopRecordReader = (RecordReader) inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit)inputSplit,taskAttemptContext);
+        } else {
+            Class inputFormatClass = conf.getInputFormat().getClass();
             InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            RecordReader hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf,
-                    createReporter());
+            hadoopRecordReader = (RecordReader)inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit)inputSplit, conf, createReporter());
         }
+        return hadoopRecordReader;
+    }
+    
+    public Object[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException {
+        return inputSplitsProxy.toInputSplits(getJobConf());
     }
 
     protected Reporter createReporter() {
@@ -145,11 +152,20 @@
                     RecordReader hadoopRecordReader;
                     Object key;
                     Object value;
-                    InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
-                    InputSplit inputSplit = splits[partition];
-                    Class inputFormatClass = Class.forName(inputFormatClassName);
-                    InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-                    hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
+                    Object[] splits = inputSplitsProxy.toInputSplits(conf);
+                    Object inputSplit = splits[partition];
+                    
+                    if(conf.getUseNewMapper()){
+                        JobContext context = new JobContext(conf,null);
+                        org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(context.getInputFormatClass(), conf);
+                        TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(jobConf, null);
+                        hadoopRecordReader = (RecordReader) inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit)inputSplit,taskAttemptContext);
+                    } else {
+                        Class inputFormatClass = conf.getInputFormat().getClass();
+                        InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+                        hadoopRecordReader = (RecordReader)inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit)inputSplit, conf, createReporter());
+                    }
+                    
                     Class inputKeyClass;
                     Class inputValueClass;
                     if (hadoopRecordReader instanceof SequenceFileRecordReader) {
@@ -197,6 +213,8 @@
                     throw new HyracksDataException(e);
                 } catch (ClassNotFoundException e) {
                     throw new HyracksDataException(e);
+                } catch (InterruptedException e){
+                    throw new HyracksDataException(e);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 728929a..b7d3296 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IDataReader;
@@ -229,8 +230,18 @@
     }
 
     public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
-        String outputKeyClassName = conf.getOutputKeyClass().getName();
-        String outputValueClassName = conf.getOutputValueClass().getName();
+        String outputKeyClassName =null; 
+        String outputValueClassName = null;
+        
+        if(conf.getUseNewMapper()) {
+            JobContext context = new JobContext(conf,null);
+            outputKeyClassName = context.getOutputKeyClass().getName();
+            outputValueClassName = context.getOutputValueClass().getName();
+        } else {
+            outputKeyClassName = conf.getOutputKeyClass().getName();
+            outputValueClassName = conf.getOutputValueClass().getName();
+        }
+        
         RecordDescriptor recordDescriptor = null;
         try {
             if (classFactory == null) {
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index a2dac50..24103c6 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -29,10 +29,19 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
@@ -44,42 +53,45 @@
 
     private static String nullWritableClassName = NullWritable.class.getName();
 
-    private static class HDFSWriter extends RecordWriter {
+    private static class HadoopFileWriter implements IRecordWriter {
 
-        HDFSWriter(FileSystem fileSystem, String hdfsPath, int[] columns, char separator) throws Exception {
-            super(columns, separator, new Object[] { fileSystem, hdfsPath });
-        }
-
-        @Override
-        public OutputStream createOutputStream(Object[] args) throws Exception {
-            FSDataOutputStream fs = ((FileSystem) args[0]).create(new Path((String) args[1]));
-            return fs;
+        Object recordWriter;
+        JobConf conf;
+        final boolean useNewMapReduceLib;
+        HadoopFileWriter(Object recordWriter,JobConf conf) {
+            this.recordWriter = recordWriter;
+            this.conf = conf;
+            useNewMapReduceLib = conf.getUseNewMapper();
         }
 
         @Override
         public void write(Object[] record) throws Exception {
-            if (!nullWritableClassName.equals((record[0].getClass().getName()))) {
-                bufferedWriter.write(String.valueOf(record[0]));
-            }
-            if (!nullWritableClassName.equals((record[1].getClass().getName()))) {
-                bufferedWriter.write(separator);
-                bufferedWriter.write(String.valueOf(record[1]));
-            }
-            bufferedWriter.write("\n");
-        }
-    }
-
-    private static class HDFSSequenceWriter extends RecordWriter {
-        private Writer writer;
-
-        HDFSSequenceWriter(FileSystem fileSystem, String hdfsPath, Writer writer) throws Exception {
-            super(null, COMMA, new Object[] { fileSystem, hdfsPath });
-            this.writer = writer;
+            if (useNewMapReduceLib){
+                ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
+            } else {
+                ((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
+            }    
         }
 
         @Override
-        public OutputStream createOutputStream(Object[] args) throws Exception {
-            return null;
+        public void close() {
+            try {
+                if (useNewMapReduceLib){
+                   ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
+                } else {
+                    ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static class HadoopSequenceWriter implements IRecordWriter {
+        private Writer writer;
+
+        HadoopSequenceWriter(Writer writer) throws Exception {
+            this.writer = writer;
         }
 
         @Override
@@ -100,45 +112,100 @@
     }
 
     private static final long serialVersionUID = 1L;
-    private static final char COMMA = ',';
-    private boolean sequenceFileOutput = false;
     Map<String, String> jobConfMap;
 
     @Override
-    protected IRecordWriter createRecordWriter(File file, int index) throws Exception {
+    protected IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception {
         JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
         conf.setClassLoader(this.getClass().getClassLoader());
+        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         FileSystem fileSystem = null;
         try {
             fileSystem = FileSystem.get(conf);
         } catch (IOException ioe) {
             ioe.printStackTrace();
         }
-        Path path = new Path(file.getPath());
-        checkIfCanWriteToHDFS(new FileSplit[] { new FileSplit("localhost", file) });
-        FSDataOutputStream outputStream = fileSystem.create(path);
-        outputStream.close();
-        if (sequenceFileOutput) {
+        Path path = new Path(fileSplit.getPath());
+        checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
+        Object recordWriter  = null;
+        boolean sequenceFileOutput = false;
+        Object outputFormat = null;
+        if(conf.getUseNewMapper()){
+            org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
+            if(newOutputFormat instanceof org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat) {
+                 sequenceFileOutput = true;
+            }else{
+                recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
+            }    
+        }else {
+            if(conf.getOutputFormat() instanceof SequenceFileOutputFormat) {
+                sequenceFileOutput = true;
+            } else {
+                recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf, "" + System.currentTimeMillis(), new Progressable() {
+                @Override
+                public void progress() {}
+                });
+            }
+        }
+        if(!sequenceFileOutput) {
+            return new HadoopFileWriter(recordWriter,conf);
+        } else { 
             Class keyClass = Class.forName(conf.getOutputKeyClass().getName());
             Class valueClass = Class.forName(conf.getOutputValueClass().getName());
             conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
             Writer writer = SequenceFile.createWriter(fileSystem, conf, path, keyClass, valueClass);
-            return new HDFSSequenceWriter(fileSystem, file.getAbsolutePath(), writer);
-        } else {
-            return new HDFSWriter(fileSystem, file.getPath(), null, COMMA);
-        }
+            return new HadoopSequenceWriter(writer);    
+        }   
     }
+    
+
+
+    protected Reporter createReporter() {
+    return new Reporter() {
+        @Override
+        public Counter getCounter(Enum<?> name) {
+            return null;
+        }
+
+        @Override
+        public Counter getCounter(String group, String name) {
+            return null;
+        }
+
+        @Override
+        public InputSplit getInputSplit() throws UnsupportedOperationException {
+            return null;
+        }
+
+        @Override
+        public void incrCounter(Enum<?> key, long amount) {
+
+        }
+
+        @Override
+        public void incrCounter(String group, String counter, long amount) {
+
+        }
+
+        @Override
+        public void progress() {
+
+        }
+
+        @Override
+        public void setStatus(String status) {
+
+        }
+    };
+}
 
     private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
-        boolean canWrite = true;
         JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
-        FileSystem fileSystem = null;
         try {
-            fileSystem = FileSystem.get(conf);
+            FileSystem fileSystem = FileSystem.get(conf);
             for (FileSplit fileSplit : fileSplits) {
-                Path path = new Path(fileSplit.getLocalFile().getPath());
-                canWrite = !fileSystem.exists(path);
-                if (!canWrite) {
+                Path path = new Path(fileSplit.getPath());
+                if (fileSystem.exists(path)) {
                     throw new Exception(" Output path :  already exists : " + path);
                 }
             }
@@ -146,12 +213,18 @@
             ioe.printStackTrace();
             throw ioe;
         }
-        return canWrite;
+        return true;
     }
 
-    private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) {
+    private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
-        if (conf.getOutputFormat() instanceof NullOutputFormat) {
+        Object outputFormat = null;
+        if(conf.getUseNewMapper()) {
+            outputFormat = ReflectionUtils.newInstance(new JobContext(conf,null).getOutputFormatClass(), conf);
+        } else {
+            outputFormat = conf.getOutputFormat();
+        }
+        if (outputFormat instanceof NullOutputFormat) {
             FileSplit[] outputFileSplits = new FileSplit[numOutputters];
             for (int i = 0; i < numOutputters; i++) {
                 String outputPath = "/tmp/" + System.currentTimeMillis() + i;
@@ -162,12 +235,14 @@
 
             FileSplit[] outputFileSplits = new FileSplit[numOutputters];
             String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
+            System.out.println("absolute path:" + absolutePath);
             for (int index = 0; index < numOutputters; index++) {
                 String suffix = new String("part-00000");
                 suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
                 suffix = suffix + index;
                 String outputPath = absolutePath + "/" + suffix;
-                outputFileSplits[index] = new FileSplit("localhost", new File(outputPath));
+                System.out.println("output path :" + outputPath);
+                outputFileSplits[index] = new FileSplit("localhost", outputPath);
             }
             return outputFileSplits;
         }
@@ -178,6 +253,5 @@
         super(jobSpec, getOutputSplits(jobConf, numMapTasks));
         this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         checkIfCanWriteToHDFS(super.splits);
-        this.sequenceFileOutput = (jobConf.getOutputFormat() instanceof SequenceFileOutputFormat);
     }
 }
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index c685724..29239e3 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -28,28 +29,40 @@
 public class InputSplitsProxy implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final Class<? extends InputSplit>[] isClasses;
+    private final Class[] isClasses;
     private final byte[] bytes;
 
-    public InputSplitsProxy(InputSplit[] inputSplits) throws IOException {
+    public InputSplitsProxy(JobConf conf, Object[] inputSplits) throws IOException {
         isClasses = new Class[inputSplits.length];
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(baos);
-        for (int i = 0; i < inputSplits.length; ++i) {
-            isClasses[i] = inputSplits[i].getClass();
-            inputSplits[i].write(dos);
+        if(conf.getUseNewMapper()){
+            for (int i = 0; i < inputSplits.length; ++i) {
+                isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit)inputSplits[i]).getClass();
+                ((Writable)inputSplits[i]).write(dos);
+            }
+        } else {
+            for (int i = 0; i < inputSplits.length; ++i) {
+                isClasses[i] = ((org.apache.hadoop.mapred.InputSplit)inputSplits[i]).getClass();
+                ((Writable)inputSplits[i]).write(dos);
+            }
         }
         dos.close();
         bytes = baos.toByteArray();
+
     }
 
-    public InputSplit[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
+    public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
             IOException {
-        InputSplit[] splits = new InputSplit[isClasses.length];
+        Object[] splits = new Object[isClasses.length];
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
         for (int i = 0; i < splits.length; ++i) {
             splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
-            splits[i].readFields(dis);
+            if(jobConf.getUseNewMapper()){
+                ((Writable)splits[i]).readFields(dis);
+            }else {
+                ((Writable)splits[i]).readFields(dis);
+            }    
         }
         return splits;
     }