modified HadoopMapper operator to work in two modes :
1) SelfReadMode: The mapper reads the input directly from HDFS instead of receiving it from another specific read operator
2) DependentMode :  The mapper is not anymore a source operator, but requires input to be fed by some other operator ( eg a reducer in case of chained MR jobs )
For operators A & B that  connect using a one-to-one connector, A & B can be fused together to form a single operator. The above change maked HadoopReadOperator redundant.
It is not being deleted here as it is a useful operator for reading from HDFS and could be used in other scenarios. 

Modified AbstractHadoopReadOperator to take as argument in the constructor , the input arity. The input arity was earlier assumed to be 1 for Map and Reduce, but is 
0 for Map in the SelfReadMode. 
Modified Reducer to pass the inputArity to base class constructor

git-svn-id: https://hyracks.googlecode.com/svn/trunk@176 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
index 90354f7..89831e8 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
@@ -63,9 +63,9 @@
     private final Map<String, String> jobConfMap;
     private IHadoopClassFactory hadoopClassFactory;
 
-    public AbstractHadoopOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, JobConf jobConf,
-            IHadoopClassFactory hadoopOperatorFactory) {
-        super(spec, 1, 1);
+    public AbstractHadoopOperatorDescriptor(JobSpecification spec, int inputArity, RecordDescriptor recordDescriptor,
+            JobConf jobConf, IHadoopClassFactory hadoopOperatorFactory) {
+        super(spec, inputArity, 1);
         jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         this.hadoopClassFactory = hadoopOperatorFactory;
         recordDescriptors[0] = recordDescriptor;
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index e169a9f..bbe8689 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -15,6 +15,8 @@
 package edu.uci.ics.hyracks.dataflow.hadoop;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
@@ -25,8 +27,12 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -34,36 +40,32 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
 public class HadoopMapperOperatorDescriptor<K1, V1, K2, V2> extends AbstractHadoopOperatorDescriptor {
-    private class MapperOperator implements IOpenableDataWriterOperator {
-        private OutputCollector<K2, V2> output;
-        private Reporter reporter;
-        private Mapper<K1, V1, K2, V2> mapper;
-        private IOpenableDataWriter<Object[]> writer;
-        private int partition;
 
-        public MapperOperator(int partition) {
+    private class MapperBaseOperator {
+        protected OutputCollector<K2, V2> output;
+        protected Reporter reporter;
+        protected Mapper<K1, V1, K2, V2> mapper;
+        protected int partition;
+        protected JobConf conf;
+        protected IOpenableDataWriter<Object[]> writer;
+
+        public MapperBaseOperator(int partition) {
             this.partition = partition;
-        };
-
-        @Override
-        public void close() throws HyracksDataException {
-            try {
-                mapper.close();
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-            writer.close();
         }
 
-        @Override
-        public void open() throws HyracksDataException {
+        protected void initializeMapper() throws HyracksDataException {
             jobConf = getJobConf();
             populateCache(jobConf);
             Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
@@ -72,23 +74,84 @@
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
-            if (inputSplitsProxy != null) {
-                updateConfWithSplit();
-            }
-            mapper.configure(jobConf);
-            writer.open();
-            output = new DataWritingOutputCollector<K2, V2>(writer);
+            conf = new JobConf(jobConf);
+            conf.setClassLoader(jobConf.getClassLoader());
             reporter = createReporter();
         }
 
-        private void updateConfWithSplit() {
+        protected void map(Object[] data) throws HyracksDataException {
             try {
-                InputSplit[] splits = inputSplitsProxy.toInputSplits(jobConf);
-                InputSplit splitRead = splits[partition];
+                mapper.map((K1) data[0], (V1) data[1], output, reporter);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            } catch (RuntimeException re) {
+                System.out.println(" Runtime exceptione encoutered for row :" + data[0] + ": " + data[1]);
+                re.printStackTrace();
+            }
+        }
+
+        protected void closeMapper() throws HyracksDataException {
+            try {
+                mapper.close();
+            } catch (IOException ioe) {
+                throw new HyracksDataException(ioe);
+            }
+        }
+
+    }
+
+    private class MapperOperator extends MapperBaseOperator implements IOpenableDataWriterOperator {
+
+        public MapperOperator(int partition) {
+            super(partition);
+        };
+
+        @Override
+        public void close() throws HyracksDataException {
+            super.closeMapper();
+            writer.close();
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            initializeMapper();
+            mapper.configure(conf);
+            writer.open();
+            output = new DataWritingOutputCollector<K2, V2>(writer);
+        }
+
+        @Override
+        public void writeData(Object[] data) throws HyracksDataException {
+            super.map(data);
+        }
+
+        public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+            if (index != 0) {
+                throw new IllegalArgumentException();
+            }
+            this.writer = writer;
+        }
+    }
+
+    private class ReaderMapperOperator extends MapperBaseOperator {
+
+        public ReaderMapperOperator(int partition, IOpenableDataWriter writer) throws HyracksDataException {
+            super(partition);
+            output = new DataWritingOutputCollector<K2, V2>(writer);
+            this.writer = writer;
+            this.writer.open();
+        }
+
+        protected void updateConfWithSplit(JobConf conf) {
+            try {
+                if (inputSplits == null) {
+                    inputSplits = inputSplitsProxy.toInputSplits(conf);
+                }
+                InputSplit splitRead = inputSplits[partition];
                 if (splitRead instanceof FileSplit) {
-                    jobConf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
-                    jobConf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
-                    jobConf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
+                    conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
+                    conf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
+                    conf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -98,21 +161,53 @@
             }
         }
 
-        @Override
-        public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
-            if (index != 0) {
-                throw new IllegalArgumentException();
-            }
-            this.writer = writer;
-        }
-
-        @Override
-        public void writeData(Object[] data) throws HyracksDataException {
+        public void mapInput() throws HyracksDataException {
             try {
-                mapper.map((K1) data[0], (V1) data[1], output, reporter);
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                initializeMapper();
+                updateConfWithSplit(conf);
+                mapper.configure(conf);
+                conf.setClassLoader(this.getClass().getClassLoader());
+                RecordReader hadoopRecordReader;
+                Object key;
+                Object value;
+                InputSplit inputSplit = inputSplits[partition];
+                Class inputFormatClass = conf.getInputFormat().getClass();
+                InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+                hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
+                Class inputKeyClass;
+                Class inputValueClass;
+                if (hadoopRecordReader instanceof SequenceFileRecordReader) {
+                    inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
+                    inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
+                } else {
+                    inputKeyClass = hadoopRecordReader.createKey().getClass();
+                    inputValueClass = hadoopRecordReader.createValue().getClass();
+                }
+
+                key = hadoopRecordReader.createKey();
+                value = hadoopRecordReader.createValue();
+                RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
+                        (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
+                int nFields = outputRecordDescriptor.getFields().length;
+                ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
+                Object[] data = new Object[2];
+                while (hadoopRecordReader.next(key, value)) {
+                    data[0] = key;
+                    data[1] = value;
+                    super.map(data);
+                }
+                hadoopRecordReader.close();
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
+
+        }
+
+        public void close() throws HyracksDataException {
+            super.closeMapper();
+            writer.close();
         }
     }
 
@@ -120,6 +215,7 @@
     private Class<? extends Mapper> mapperClass;
     private InputSplitsProxy inputSplitsProxy;
     private transient InputSplit[] inputSplits;
+    private boolean selfRead = false;
 
     private void initializeSplitInfo(InputSplit[] splits) throws IOException {
         jobConf = super.getJobConf();
@@ -127,12 +223,16 @@
         inputSplitsProxy = new InputSplitsProxy(splits);
     }
 
+    public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory)
+            throws IOException {
+        super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
+    }
+
     public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, InputSplit[] splits,
             IHadoopClassFactory hadoopClassFactory) throws IOException {
-        super(spec, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
-        if (splits != null) {
-            initializeSplitInfo(splits);
-        }
+        super(spec, 0, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
+        initializeSplitInfo(splits);
+        this.selfRead = true;
     }
 
     public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory hadoopClassFactory) {
@@ -169,22 +269,40 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        RecordDescriptor recordDescriptor = null;
+
         JobConf conf = getJobConf();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         try {
-            if (inputSplits == null) {
-                inputSplits = inputSplitsProxy.toInputSplits(conf);
+            if (selfRead) {
+                RecordDescriptor recordDescriptor = null;
+                if (inputSplits == null) {
+                    inputSplits = inputSplitsProxy.toInputSplits(conf);
+                }
+                RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[partition], conf,
+                        super.createReporter());
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) reader
+                        .createKey().getClass(), (Class<? extends Writable>) reader.createValue().getClass());
+                return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
+            } else {
+                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
+                        .getInputRecordDescriptor(this.odId, 0));
             }
-            RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[partition], conf,
-                    super.createReporter());
-            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) reader
-                    .createKey().getClass(), (Class<? extends Writable>) reader.createValue().getClass());
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
+    }
 
-        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescriptor);
+    private IOperatorNodePushable createSelfReadingMapper(final IHyracksContext ctx, IOperatorEnvironment env,
+            final RecordDescriptor recordDescriptor, final int partition) {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                SerializingDataWriter writer = new SerializingDataWriter(ctx, recordDescriptor, this.writer);
+                ReaderMapperOperator readMapOp = new ReaderMapperOperator(partition, writer);
+                readMapOp.mapInput();
+                readMapOp.close();
+            }
+        };
     }
 
     public Class<? extends Mapper> getMapperClass() {
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 7dbed84..728929a 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -183,7 +183,7 @@
 
     public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
             IHadoopClassFactory classFactory) {
-        super(spec, getRecordDescriptor(conf, classFactory), conf, classFactory);
+        super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
         this.comparatorFactory = comparatorFactory;
     }