Made changes to support org.apache.hadoop.mapreduce library in addition to org.apache.hadoop.mapred library. The new library is used in Hadoop client community, notably in Pig and Mahout. To be compatible with hadoop, this change is mandatory

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@187 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index 08df437..7efceef 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -16,7 +16,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
@@ -26,7 +25,11 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -39,7 +42,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
@@ -53,24 +55,30 @@
     private class MapperBaseOperator {
         protected OutputCollector<K2, V2> output;
         protected Reporter reporter;
-        protected Mapper<K1, V1, K2, V2> mapper;
+        protected Object mapper;
+        //protected Mapper<K1, V1, K2, V2> mapper;
         protected int partition;
         protected JobConf conf;
         protected IOpenableDataWriter<Object[]> writer;
+        protected boolean newMapreduceLib = false;
+        org.apache.hadoop.mapreduce.Mapper.Context context;
 
         public MapperBaseOperator(int partition) {
             this.partition = partition;
         }
 
         protected void initializeMapper() throws HyracksDataException {
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             jobConf = getJobConf();
             populateCache(jobConf);
-            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             try {
                 mapper = createMapper();
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
+            if (!jobConf.getUseNewMapper()) {
+                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+            }
             conf = new JobConf(jobConf);
             conf.setClassLoader(jobConf.getClassLoader());
             reporter = createReporter();
@@ -78,7 +86,11 @@
 
         protected void map(Object[] data) throws HyracksDataException {
             try {
-                mapper.map((K1) data[0], (V1) data[1], output, reporter);
+                if (!conf.getUseNewMapper()) {
+                    ((org.apache.hadoop.mapred.Mapper) mapper).map((K1) data[0], (V1) data[1], output, reporter);
+                } else
+                    throw new IllegalStateException(
+                            " Incorrect map method called for MapReduce code written using mapreduce package");
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             } catch (RuntimeException re) {
@@ -89,7 +101,11 @@
 
         protected void closeMapper() throws HyracksDataException {
             try {
-                mapper.close();
+                if (!conf.getUseNewMapper()) {
+                    ((org.apache.hadoop.mapred.Mapper) mapper).close();
+                } else {
+                    // do nothing. closing the mapper is handled internally by run method on context. 
+                }
             } catch (IOException ioe) {
                 throw new HyracksDataException(ioe);
             }
@@ -112,7 +128,6 @@
         @Override
         public void open() throws HyracksDataException {
             initializeMapper();
-            mapper.configure(conf);
             writer.open();
             output = new DataWritingOutputCollector<K2, V2>(writer);
         }
@@ -149,10 +164,13 @@
                     conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
                     conf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
                     conf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
-                } else if ( splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
-                    conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath().toString());
-                    conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
-                    conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
+                } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+                    conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
+                            .toString());
+                    conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+                            .getStart());
+                    conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+                            .getLength());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -164,33 +182,63 @@
 
         public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
             try {
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                 initializeMapper();
                 updateConfWithSplit(conf);
-                mapper.configure(conf);
                 conf.setClassLoader(this.getClass().getClassLoader());
                 Object reader;
                 Object key = null;
                 Object value = null;
                 Object inputSplit = inputSplits[partition];
                 reader = getRecordReader(conf, inputSplit);
-                Class inputKeyClass = null;
-                Class inputValueClass = null;
-                Object[] data = new Object[2];
-                  
-                if(conf.getUseNewMapper()){
-                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader)reader;
-                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit)inputSplit, new TaskAttemptContext(conf, new TaskAttemptID()));
-                    while(newReader.nextKeyValue()){
-                        data[0] = newReader.getCurrentKey();
-                        if(data[0] == null) {
-                            data[0] = NullWritable.get();
+                final Object[] data = new Object[2];
+                if (conf.getUseNewMapper()) {
+                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader;
+                    org.apache.hadoop.mapreduce.RecordWriter recordWriter = new RecordWriter() {
+
+                        @Override
+                        public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+                            // TODO Auto-generated method stub
                         }
-                        data[1] = newReader.getCurrentValue();
-                        super.map(data);
-                    }
+
+                        @Override
+                        public void write(Object key, Object value) throws IOException, InterruptedException {
+                            data[0] = key;
+                            data[1] = value;
+                            writer.writeData(data);
+                        }
+                    };;;
+
+                    OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
+                            .getOutputCommitter(new TaskAttemptContext(jobConf, new TaskAttemptID()));
+                    StatusReporter statusReporter = new StatusReporter() {
+                        @Override
+                        public void setStatus(String arg0) {
+                        }
+
+                        @Override
+                        public void progress() {
+                        }
+
+                        @Override
+                        public Counter getCounter(String arg0, String arg1) {
+                            return null;
+                        }
+
+                        @Override
+                        public Counter getCounter(Enum<?> arg0) {
+                            return null;
+                        }
+                    };;;
+                    context = new org.apache.hadoop.mapreduce.Mapper().new Context(jobConf, new TaskAttemptID(),
+                            newReader, recordWriter, outputCommitter, statusReporter,
+                            (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
+                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context);
+                    ((org.apache.hadoop.mapreduce.Mapper) mapper).run(context);
                 } else {
-                    RecordReader oldReader = (RecordReader)reader;
+                    Class inputKeyClass = null;
+                    Class inputValueClass = null;
+
+                    RecordReader oldReader = (RecordReader) reader;
                     if (reader instanceof SequenceFileRecordReader) {
                         inputKeyClass = ((SequenceFileRecordReader) oldReader).getKeyClass();
                         inputValueClass = ((SequenceFileRecordReader) oldReader).getValueClass();
@@ -207,7 +255,7 @@
                     }
                     oldReader.close();
                 }
-                
+
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -221,7 +269,7 @@
     }
 
     private static final long serialVersionUID = 1L;
-    private Class<? extends Mapper> mapperClass;
+    private Class mapperClass;
     private InputSplitsProxy inputSplitsProxy;
     private transient Object[] inputSplits;
     private boolean selfRead = false;
@@ -229,7 +277,7 @@
     private void initializeSplitInfo(Object[] splits) throws IOException {
         jobConf = super.getJobConf();
         InputFormat inputFormat = jobConf.getInputFormat();
-        inputSplitsProxy = new InputSplitsProxy(jobConf,splits);
+        inputSplitsProxy = new InputSplitsProxy(jobConf, splits);
     }
 
     public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory)
@@ -264,30 +312,43 @@
         return recordDescriptor;
     }
 
-    private Mapper<K1, V1, K2, V2> createMapper() throws Exception {
+    private Object createMapper() throws Exception {
+        Object mapper;
         if (mapperClass != null) {
             return mapperClass.newInstance();
         } else {
-            String mapperClassName = super.getJobConf().getMapperClass().getName();
-            Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
-            mapperClass = (Class<? extends Mapper>) mapper.getClass();
-            return (Mapper) mapper;
+            String mapperClassName = null;
+            if (jobConf.getUseNewMapper()) {
+                JobContext jobContext = new JobContext(jobConf, null);
+                mapperClass = jobContext.getMapperClass();
+                mapperClassName = mapperClass.getName();
+            } else {
+                mapperClass = super.getJobConf().getMapperClass();
+                mapperClassName = mapperClass.getName();
+            }
+            mapper = getHadoopClassFactory().createMapper(mapperClassName,jobConf);
         }
+        return mapper;
     }
 
-    private Object getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException, InterruptedException {
-        if(conf.getUseNewMapper()){
-            JobContext context = new JobContext(conf,null);
-            org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(context.getInputFormatClass(), conf);
-            TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(jobConf,new TaskAttemptID());
-            return inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit)inputSplit,taskAttemptContext);
+    private Object getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException,
+            InterruptedException {
+        if (conf.getUseNewMapper()) {
+            JobContext context = new JobContext(conf, null);
+            org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils
+                    .newInstance(context.getInputFormatClass(), conf);
+            TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(conf,
+                    new TaskAttemptID());
+            return inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit) inputSplit,
+                    taskAttemptContext);
         } else {
             Class inputFormatClass = conf.getInputFormat().getClass();
             InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit)inputSplit, conf, super.createReporter());
+            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
+                    .createReporter());
         }
     }
-    
+
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
@@ -300,22 +361,25 @@
                 if (inputSplits == null) {
                     inputSplits = inputSplitsProxy.toInputSplits(conf);
                 }
-                Object reader = getRecordReader(conf,inputSplits[partition]);
-                if(conf.getUseNewMapper()) {
-                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader)reader;
-                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit)inputSplits[partition], new TaskAttemptContext(conf, new TaskAttemptID()));
+                Object reader = getRecordReader(conf, inputSplits[partition]);
+                if (conf.getUseNewMapper()) {
+                    org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader) reader;
+                    newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplits[partition],
+                            new TaskAttemptContext(conf, new TaskAttemptID()));
                     newReader.nextKeyValue();
                     Object key = newReader.getCurrentKey();
                     Class keyClass = null;
                     if (key == null) {
                         keyClass = Class.forName("org.apache.hadoop.io.NullWritable");
                     }
-                    recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) keyClass,
-                            (Class<? extends Writable>) newReader.getCurrentValue().getClass());
+                    recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                            (Class<? extends Writable>) keyClass, (Class<? extends Writable>) newReader
+                                    .getCurrentValue().getClass());
                 } else {
-                RecordReader oldReader = (RecordReader)reader;    
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) oldReader
-                        .createKey().getClass(), (Class<? extends Writable>) oldReader.createValue().getClass());
+                    RecordReader oldReader = (RecordReader) reader;
+                    recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                            (Class<? extends Writable>) oldReader.createKey().getClass(),
+                            (Class<? extends Writable>) oldReader.createValue().getClass());
                 }
                 return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
             } else {
@@ -338,7 +402,7 @@
                     readMapOp.mapInput();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
-                } 
+                }
                 readMapOp.close();
             }
         };
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index b7d3296..f208669 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -18,16 +18,21 @@
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progress;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IDataReader;
@@ -51,17 +56,106 @@
 
 public class HadoopReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractHadoopOperatorDescriptor {
     private class ReducerAggregator implements IGroupAggregator {
-        private Reducer<K2, V2, K3, V3> reducer;
+        private Object reducer;
         private DataWritingOutputCollector<K3, V3> output;
         private Reporter reporter;
+        private ReducerContext reducerContext; 
+        RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() {
+            
+            @Override
+            public boolean next() throws IOException {
+                return false;
+            }
+            
+            @Override
+            public DataInputBuffer getValue() throws IOException {
+                return null;
+            }
+            
+            @Override
+            public Progress getProgress() {
+                return null;
+            }
+            
+            @Override
+            public DataInputBuffer getKey() throws IOException {
+                return null;
+            }
+            
+            @Override
+            public void close() throws IOException {
+                
+            }
+        };
+        
+    
+        class ReducerContext extends org.apache.hadoop.mapreduce.Reducer.Context {
+            private HadoopReducerOperatorDescriptor.ValueIterator iterator;
+            
+            @SuppressWarnings("unchecked")
+            ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException, InterruptedException, ClassNotFoundException{
+            
+                reducer.super(conf,new TaskAttemptID(),rawKeyValueIterator,null,null,null,null,null,null,Class.forName("org.apache.hadoop.io.NullWritable"),Class.forName("org.apache.hadoop.io.NullWritable"));
+            }
+            
+            public  void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
+                iterator = iter;
+            }
+            
+            @Override
+            public Iterable<V2> getValues() throws IOException, InterruptedException {
+              return new Iterable<V2>() {
+                @Override
+                public Iterator<V2> iterator() {
+                    return iterator;
+                }
+              };
+            }
+            
+            /** Start processing next unique key. */
+            @Override
+            public boolean nextKey() throws IOException,InterruptedException {
+                boolean hasMore = iterator.hasNext();
+                if(hasMore){
+                    nextKeyValue();
+                }
+                return hasMore;
+            }
 
-        public ReducerAggregator(Reducer<K2, V2, K3, V3> reducer) {
+            /**
+             * Advance to the next key/value pair.
+             */
+            @Override
+            public boolean nextKeyValue() throws IOException, InterruptedException {
+                iterator.next();
+                return true;
+            }
+
+            public Object getCurrentKey() {
+              return iterator.getKey();
+            }
+
+            @Override
+            public Object getCurrentValue() {
+              return iterator.getValue();
+            }
+            
+            /**
+             * Generate an output key/value pair.
+             */
+            @Override
+            public void write(Object key, Object value
+                              ) throws IOException, InterruptedException {
+              output.collect(key, value);
+            }
+
+        }
+            
+        public ReducerAggregator(Object reducer) throws HyracksDataException{
             this.reducer = reducer;
-            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-            reducer.configure(getJobConf());
+            initializeReducer();
             output = new DataWritingOutputCollector<K3, V3>();
             reporter = new Reporter() {
-
                 @Override
                 public void progress() {
 
@@ -101,15 +195,22 @@
 
         @Override
         public void aggregate(IDataReader<Object[]> reader, IDataWriter<Object[]> writer) throws HyracksDataException {
-
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             ValueIterator i = new ValueIterator();
             i.reset(reader);
             output.setWriter(writer);
             try {
-
-                // -- - reduce - --
-                reducer.reduce(i.getKey(), i, output, reporter);
-
+                if(jobConf.getUseNewReducer()){
+                    try {
+                        reducerContext.setIterator(i);
+                        ((org.apache.hadoop.mapreduce.Reducer)reducer).run(reducerContext);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                        throw new HyracksDataException(e);
+                    }
+                } else {  
+                    ((org.apache.hadoop.mapred.Reducer)reducer).reduce(i.getKey(), i, output, reporter);
+                }    
             } catch (IOException e) {
                 e.printStackTrace();
             }
@@ -119,11 +220,34 @@
         public void close() throws HyracksDataException {
             // -- - close - --
             try {
-                reducer.close();
+                if(!jobConf.getUseNewMapper()) {
+                    ((org.apache.hadoop.mapred.Reducer)reducer).close();
+                }    
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
         }
+        
+        private void initializeReducer() throws HyracksDataException {
+            jobConf.setClassLoader(this.getClass().getClassLoader());
+            if(!jobConf.getUseNewReducer()) {
+                ((org.apache.hadoop.mapred.Reducer)reducer).configure(getJobConf());    
+            } else {
+                try {
+                    reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer)reducer,jobConf);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    throw new HyracksDataException(e);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                    throw new HyracksDataException(e);
+                } catch (RuntimeException e){
+                    e.printStackTrace();
+                } catch (ClassNotFoundException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
     private class ValueIterator implements Iterator<V2> {
@@ -134,6 +258,10 @@
         public K2 getKey() {
             return key;
         }
+        
+        public V2 getValue() {
+            return value;
+        }
 
         @Override
         public boolean hasNext() {
@@ -179,7 +307,7 @@
     }
 
     private static final long serialVersionUID = 1L;
-    private Class<? extends Reducer> reducerClass;
+    private Class reducerClass;
     private IComparatorFactory comparatorFactory;
 
     public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
@@ -188,13 +316,19 @@
         this.comparatorFactory = comparatorFactory;
     }
 
-    private Reducer<K2, V2, K3, V3> createReducer() throws Exception {
+    private Object createReducer() throws Exception {
         if (reducerClass != null) {
             return reducerClass.newInstance();
         } else {
-            Object reducer = getHadoopClassFactory().createReducer(getJobConf().getReducerClass().getName());
-            reducerClass = (Class<? extends Reducer>) reducer.getClass();
-            return (Reducer) reducer;
+            Object reducer;
+            if(jobConf.getUseNewReducer()){
+                JobContext jobContext = new JobContext(jobConf, null);
+                reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getReducerClass();
+            } else {
+                reducerClass = (Class<? extends Reducer>) jobConf.getReducerClass();
+            }
+            reducer = getHadoopClassFactory().createReducer(reducerClass.getName(),jobConf);
+            return reducer;
         }
     }
 
@@ -204,6 +338,7 @@
         try {
             if (this.comparatorFactory == null) {
                 String comparatorClassName = getJobConf().getOutputValueGroupingComparator().getClass().getName();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                 RawComparator rawComparator = null;
                 if (comparatorClassName != null) {
                     Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
index 8137c62..f4dbe4a 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -14,21 +14,22 @@
  */
 package edu.uci.ics.hyracks.dataflow.hadoop.util;
 
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
 
 public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
 
     @Override
-    public Mapper createMapper(String mapClassName) throws Exception {
+    public Object createMapper(String mapClassName,JobConf conf) throws Exception {
         Class clazz = loadClass(mapClassName);
-        return (Mapper) clazz.newInstance();
+        return ReflectionUtils.newInstance(clazz, conf);
     }
 
     @Override
-    public Reducer createReducer(String reduceClassName) throws Exception {
+    public Object createReducer(String reduceClassName,JobConf conf) throws Exception {
         Class clazz = loadClass(reduceClassName);
-        return (Reducer) clazz.newInstance();
+        return  ReflectionUtils.newInstance(clazz, conf);
     }
 
     @Override
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
index 1ce78ef..22db448 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -16,14 +16,13 @@
 
 import java.io.Serializable;
 
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobConf;
 
 public interface IHadoopClassFactory extends Serializable {
 
-    public Mapper createMapper(String mapClassName) throws Exception;
+    public Object createMapper(String mapClassName,JobConf conf) throws Exception;
 
-    public Reducer createReducer(String reduceClassName) throws Exception;
+    public Object createReducer(String reduceClassName,JobConf conf) throws Exception;
 
     public Class loadClass(String className) throws Exception;
 }