1) Resolved issue related to reduce operator closing correctly. 
 Reducer.close is not called and hence when running hive on hyracks, the query does not complete with job running indefinitely.
 
 2) Dynamic Record Descriptors
    Hadoop read and map operators needed to have record descriptors for each partition as the record descriptors may vary for every partition. 
    
     
 

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@152 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
index 2d7bc84..90354f7 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
@@ -131,29 +131,33 @@
     }
 
     public void populateCache(JobConf jobConf) {
-        String cache = jobConf.get(MAPRED_CACHE_FILES);
-        System.out.println("cache:" + cache);
-        if (cache == null) {
-            return;
-        }
-        String localCache = jobConf.get(MAPRED_CACHE_LOCALFILES);
-        System.out.println("localCache:" + localCache);
-        if (localCache != null) {
-            return;
-        }
-        localCache = "";
-        StringTokenizer cacheTokenizer = new StringTokenizer(cache, ",");
-        while (cacheTokenizer.hasMoreTokens()) {
-            if (!"".equals(localCache)) {
-                localCache += ",";
+        try {
+            String cache = jobConf.get(MAPRED_CACHE_FILES);
+            System.out.println("cache:" + cache);
+            if (cache == null) {
+                return;
             }
-            try {
-                localCache += DCacheClient.get().get(cacheTokenizer.nextToken());
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+            String localCache = jobConf.get(MAPRED_CACHE_LOCALFILES);
+            System.out.println("localCache:" + localCache);
+            if (localCache != null) {
+                return;
             }
+            localCache = "";
+            StringTokenizer cacheTokenizer = new StringTokenizer(cache, ",");
+            while (cacheTokenizer.hasMoreTokens()) {
+                if (!"".equals(localCache)) {
+                    localCache += ",";
+                }
+                try {
+                    localCache += DCacheClient.get().get(cacheTokenizer.nextToken());
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            jobConf.set(MAPRED_CACHE_LOCALFILES, localCache);
+            System.out.println("localCache:" + localCache);
+        } catch (Exception e) {
+
         }
-        jobConf.set(MAPRED_CACHE_LOCALFILES, localCache);
-        System.out.println("localCache:" + localCache);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index 6ebe0b9..e169a9f 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
@@ -65,6 +66,7 @@
         public void open() throws HyracksDataException {
             jobConf = getJobConf();
             populateCache(jobConf);
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             try {
                 mapper = createMapper();
             } catch (Exception e) {
@@ -117,6 +119,7 @@
     private static final long serialVersionUID = 1L;
     private Class<? extends Mapper> mapperClass;
     private InputSplitsProxy inputSplitsProxy;
+    private transient InputSplit[] inputSplits;
 
     private void initializeSplitInfo(InputSplit[] splits) throws IOException {
         jobConf = super.getJobConf();
@@ -165,9 +168,23 @@
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
-                .getInputRecordDescriptor(getOperatorId(), 0));
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        RecordDescriptor recordDescriptor = null;
+        JobConf conf = getJobConf();
+        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+        try {
+            if (inputSplits == null) {
+                inputSplits = inputSplitsProxy.toInputSplits(conf);
+            }
+            RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[partition], conf,
+                    super.createReporter());
+            recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) reader
+                    .createKey().getClass(), (Class<? extends Writable>) reader.createValue().getClass());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+
+        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescriptor);
     }
 
     public Class<? extends Mapper> getMapperClass() {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 871ea25..a75e6a9 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -51,6 +51,14 @@
     private String inputFormatClassName;
     private Map<String, String> jobConfMap;
     private InputSplitsProxy inputSplitsProxy;
+    private transient JobConf jobConf;
+
+    public JobConf getJobConf() {
+        if (jobConf == null) {
+            jobConf = DatatypeHelper.map2JobConf(jobConfMap);
+        }
+        return jobConf;
+    }
 
     public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, InputSplit[] splits) throws IOException {
         super(spec, 0, 1);
@@ -59,9 +67,35 @@
         RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, createReporter());
         recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
                 .createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
+        System.out.println(" READER : KEY : " + recordReader.createKey().getClass().getName());
+        System.out.println(" READER : VALUE : " + recordReader.createValue().getClass().getName());
+        System.out.println(" Record descriptor : " + recordDescriptors[0].getFields()[0].getClass().getName());
+        System.out.println(" Record descriptor : " + recordDescriptors[0].getFields()[1].getClass().getName());
         this.setPartitionConstraint(new PartitionCountConstraint(splits.length));
         inputSplitsProxy = new InputSplitsProxy(splits);
         this.inputFormatClassName = inputFormat.getClass().getName();
+        try {
+            checkSplits(jobConf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public InputSplit[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException {
+        return inputSplitsProxy.toInputSplits(getJobConf());
+    }
+
+    private void checkSplits(JobConf conf) throws Exception {
+        InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
+        for (InputSplit inputSplit : splits) {
+            Class inputFormatClass = Class.forName(inputFormatClassName);
+            InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+            RecordReader hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf,
+                    createReporter());
+            System.out.println("split :" + inputSplit);
+            System.out.println(" READER (proxy): KEY : " + hadoopRecordReader.createKey().getClass().getName());
+            System.out.println(" READER (proxy): VALUE : " + hadoopRecordReader.createValue().getClass().getName());
+        }
     }
 
     protected Reporter createReporter() {
@@ -113,10 +147,11 @@
             public void initialize() throws HyracksDataException {
                 try {
                     JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
+                    Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                     conf.setClassLoader(this.getClass().getClassLoader());
                     RecordReader hadoopRecordReader;
-                    Writable key;
-                    Writable value;
+                    Object key;
+                    Object value;
                     InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
                     InputSplit inputSplit = splits[partition];
                     Class inputFormatClass = Class.forName(inputFormatClassName);
@@ -131,13 +166,19 @@
                         inputKeyClass = hadoopRecordReader.createKey().getClass();
                         inputValueClass = hadoopRecordReader.createValue().getClass();
                     }
-                    key = (Writable) inputKeyClass.newInstance();
-                    value = (Writable) inputValueClass.newInstance();
+
+                    key = hadoopRecordReader.createKey();
+                    value = hadoopRecordReader.createValue();
                     ByteBuffer outBuffer = ctx.getResourceManager().allocateFrame();
                     FrameTupleAppender appender = new FrameTupleAppender(ctx);
                     appender.reset(outBuffer, true);
+                    RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                            (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
+                            (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
+                    /*
                     RecordDescriptor outputRecordDescriptor = recordDescProvider.getOutputRecordDescriptor(
                             getOperatorId(), 0);
+                    */
                     int nFields = outputRecordDescriptor.getFields().length;
                     ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
                     while (hadoopRecordReader.next(key, value)) {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 8a22641..7dbed84 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -56,6 +56,7 @@
 
         public ReducerAggregator(Reducer<K2, V2, K3, V3> reducer) {
             this.reducer = reducer;
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             reducer.configure(getJobConf());
             output = new DataWritingOutputCollector<K3, V3>();
             reporter = new Reporter() {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 090bca1..a2dac50 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
 
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
@@ -150,16 +151,27 @@
 
     private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) {
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
-        FileSplit[] outputFileSplits = new FileSplit[numOutputters];
-        String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
-        for (int index = 0; index < numOutputters; index++) {
-            String suffix = new String("part-00000");
-            suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-            suffix = suffix + index;
-            String outputPath = absolutePath + "/" + suffix;
-            outputFileSplits[index] = new FileSplit("localhost", new File(outputPath));
+        if (conf.getOutputFormat() instanceof NullOutputFormat) {
+            FileSplit[] outputFileSplits = new FileSplit[numOutputters];
+            for (int i = 0; i < numOutputters; i++) {
+                String outputPath = "/tmp/" + System.currentTimeMillis() + i;
+                outputFileSplits[i] = new FileSplit("localhost", new File(outputPath));
+            }
+            return outputFileSplits;
+        } else {
+
+            FileSplit[] outputFileSplits = new FileSplit[numOutputters];
+            String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
+            for (int index = 0; index < numOutputters; index++) {
+                String suffix = new String("part-00000");
+                suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+                suffix = suffix + index;
+                String outputPath = absolutePath + "/" + suffix;
+                outputFileSplits[index] = new FileSplit("localhost", new File(outputPath));
+            }
+            return outputFileSplits;
         }
-        return outputFileSplits;
+
     }
 
     public HadoopWriteOperatorDescriptor(JobSpecification jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
index 56ddbe5..8137c62 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -19,23 +19,22 @@
 
 public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
 
-	@Override
-	public Mapper createMapper(String mapClassName) throws Exception {
-		Class clazz = loadClass(mapClassName);
-		return (Mapper)clazz.newInstance();
-	}
+    @Override
+    public Mapper createMapper(String mapClassName) throws Exception {
+        Class clazz = loadClass(mapClassName);
+        return (Mapper) clazz.newInstance();
+    }
 
-	@Override
-	public Reducer createReducer(String reduceClassName) throws Exception {
-		Class clazz = loadClass(reduceClassName);
-		return (Reducer)clazz.newInstance();
-	}
+    @Override
+    public Reducer createReducer(String reduceClassName) throws Exception {
+        Class clazz = loadClass(reduceClassName);
+        return (Reducer) clazz.newInstance();
+    }
 
-	@Override
-	public Class loadClass(String className) throws Exception {
-		Class clazz = Class.forName(className);
-		return clazz;
-	}
+    @Override
+    public Class loadClass(String className) throws Exception {
+        Class clazz = Class.forName(className);
+        return clazz;
+    }
 
-	
 }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
index 23f7f6e..10f29c8 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/DatatypeHelper.java
@@ -63,7 +63,7 @@
                 o.readFields(in);
             } catch (IOException e) {
                 e.printStackTrace();
-                // throw new HyracksDataException(e);
+                throw new HyracksDataException(e);
             }
             return o;
         }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
index d465471..1ce78ef 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -19,11 +19,11 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
 
-public interface IHadoopClassFactory extends Serializable{
+public interface IHadoopClassFactory extends Serializable {
 
-	public Mapper createMapper(String mapClassName) throws Exception;
-	
-	public Reducer createReducer(String reduceClassName) throws Exception;
-	
-	public Class loadClass(String className) throws Exception;
+    public Mapper createMapper(String mapClassName) throws Exception;
+
+    public Reducer createReducer(String reduceClassName) throws Exception;
+
+    public Class loadClass(String className) throws Exception;
 }