1) All Hadoop* operators now work with an instance of JobConf instead of working with 
   a hashmap. This is essential because JobConf has default settings (values) for non-configured keys. Using a map returns null values when using the JobConf could return valid defaults. 

2) All Hadoop* operators now set class loader for the JobConf instance they work with.
   This is required as the default class loader used inside JobConf cannot load application specific classes.

3) Hadoop Mapper needs to know specifics of the input split it is mapping. This is required to provide the similar environment as provided by Hadoop. Hence configuring a mapper now requires the input splits. At run time, the specific input split is chosen and its properties are set in the job conf before the mapper implementation is confgured.
   
3) Renamed HDFSWriteOperatorDescriptor to HadoopWriteOperatorDescriptor


git-svn-id: https://hyracks.googlecode.com/svn/trunk@144 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
index 9b054e4..2d7bc84 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
@@ -14,18 +14,10 @@
  */
 package edu.uci.ics.hyracks.dataflow.hadoop;
 
-import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
 
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -42,8 +34,9 @@
 
 public abstract class AbstractHadoopOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-	
-	protected static class DataWritingOutputCollector<K, V> implements OutputCollector<K, V> {
+    protected transient JobConf jobConf;
+
+    protected static class DataWritingOutputCollector<K, V> implements OutputCollector<K, V> {
         private IDataWriter<Object[]> writer;
 
         public DataWritingOutputCollector() {
@@ -55,7 +48,7 @@
 
         @Override
         public void collect(Object key, Object value) throws IOException {
-           	writer.writeData(new Object[] { key, value });
+            writer.writeData(new Object[] { key, value });
         }
 
         public void setWriter(IDataWriter<Object[]> writer) {
@@ -69,38 +62,28 @@
     private static final long serialVersionUID = 1L;
     private final Map<String, String> jobConfMap;
     private IHadoopClassFactory hadoopClassFactory;
-    
-    public abstract RecordDescriptor getRecordDescriptor(JobConf jobConf);
-    	
+
     public AbstractHadoopOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, JobConf jobConf,
             IHadoopClassFactory hadoopOperatorFactory) {
         super(spec, 1, 1);
         jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         this.hadoopClassFactory = hadoopOperatorFactory;
-        if(recordDescriptor != null){
-        	recordDescriptors[0]= recordDescriptor;
-        }else{
-        	recordDescriptors[0] = getRecordDescriptor(jobConf);
-        }
+        recordDescriptors[0] = recordDescriptor;
     }
 
-     
     public Map<String, String> getJobConfMap() {
-		return jobConfMap;
-	}
+        return jobConfMap;
+    }
 
+    public IHadoopClassFactory getHadoopClassFactory() {
+        return hadoopClassFactory;
+    }
 
-	public IHadoopClassFactory getHadoopClassFactory() {
-		return hadoopClassFactory;
-	}
+    public void setHadoopClassFactory(IHadoopClassFactory hadoopClassFactory) {
+        this.hadoopClassFactory = hadoopClassFactory;
+    }
 
-
-	public void setHadoopClassFactory(IHadoopClassFactory hadoopClassFactory) {
-		this.hadoopClassFactory = hadoopClassFactory;
-	}
-
-
-	protected Reporter createReporter() {
+    protected Reporter createReporter() {
         return new Reporter() {
             @Override
             public Counter getCounter(Enum<?> name) {
@@ -110,8 +93,7 @@
             @Override
             public Counter getCounter(String group, String name) {
                 return null;
-            }    
-              
+            }
 
             @Override
             public InputSplit getInputSplit() throws UnsupportedOperationException {
@@ -141,9 +123,13 @@
     }
 
     public JobConf getJobConf() {
-        return DatatypeHelper.map2JobConf(jobConfMap);
+        if (jobConf == null) {
+            jobConf = DatatypeHelper.map2JobConf(jobConfMap);
+            jobConf.setClassLoader(this.getClass().getClassLoader());
+        }
+        return jobConf;
     }
-    
+
     public void populateCache(JobConf jobConf) {
         String cache = jobConf.get(MAPRED_CACHE_FILES);
         System.out.println("cache:" + cache);
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
deleted file mode 100644
index daaa5ea..0000000
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HDFSWriteOperatorDescriptor.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.hadoop;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IRecordWriter;
-import edu.uci.ics.hyracks.dataflow.std.file.RecordWriter;
-
-
-
-public class HDFSWriteOperatorDescriptor extends
-		AbstractFileWriteOperatorDescriptor {
-	
-    private static String nullWritableClassName = NullWritable.class.getName();
-	
-    private static class HDFSWriter extends RecordWriter {
-		
-       HDFSWriter(FileSystem fileSystem, String hdfsPath, int[] columns, char separator)
-           throws Exception {
-           super(columns,separator,new Object[]{fileSystem,hdfsPath});
-       }       
-    
-
-       @Override
-       public OutputStream createOutputStream(Object[] args) throws Exception {
-	       FSDataOutputStream fs = ((FileSystem)args[0]).create(new Path((String)args[1]));
-	       return fs;
-       }
-
-       @Override
-       public void write(Object[] record) throws Exception {
-          if(!nullWritableClassName.equals((record[0].getClass().getName()))){
-              bufferedWriter.write(String.valueOf(record[0]));
-          }
-          if(!nullWritableClassName.equals((record[1].getClass().getName()))){
-              bufferedWriter.write(separator);	 
-              bufferedWriter.write(String.valueOf(record[1]));
-          }	 
-          bufferedWriter.write("\n");
-       }
-    }
-
-    private static class HDFSSequenceWriter extends RecordWriter {
-	private Writer writer;
-	HDFSSequenceWriter(FileSystem fileSystem, String hdfsPath, Writer writer)
-            throws Exception {
-            super(null,COMMA,new Object[]{fileSystem,hdfsPath});
-    	    this.writer = writer;
-        }
-
-	@Override
-	public OutputStream createOutputStream(Object[] args) throws Exception {
-		return null;
-	}
-		
-	@Override
-        public void close() {
-            try {
-                writer.close();
-            } catch (IOException e) {
-	          e.printStackTrace();
-	    }
-        }
-
-        @Override
-        public void write(Object[] record) throws Exception {
-             Object key = record[0];
-             Object value = record[1];
-             writer.append(key, value);
-        }
-    }
-	
-    private static final long serialVersionUID = 1L;
-    private static final char COMMA = ',';
-    private char separator;
-    private boolean sequenceFileOutput = false;
-    Map<String,String> jobConfMap;
-    
-
-    @Override
-    protected IRecordWriter createRecordWriter(File file,int index) throws Exception {
-    	JobConf conf = DatatypeHelper.map2JobConf((HashMap)jobConfMap);
-	System.out.println("replication:" + conf.get("dfs.replication"));
-    	FileSystem fileSystem = null;
-     	try {
-	    fileSystem = FileSystem.get(conf);
-	} catch (IOException ioe) {
-            ioe.printStackTrace();
-	}
-	Path path = new Path(file.getAbsolutePath());
-	checkIfCanWriteToHDFS(new FileSplit[]{new FileSplit("localhost",file)});
-	FSDataOutputStream outputStream = fileSystem.create(path);
-	outputStream.close();
-	if(sequenceFileOutput) {
-            Class keyClass = Class.forName(conf.getOutputKeyClass().getName());  
-	    Class valueClass= Class.forName(conf.getOutputValueClass().getName());
-            conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
-   	    Writer writer = SequenceFile.createWriter(fileSystem, conf,path, keyClass, valueClass);
-	    return new HDFSSequenceWriter(fileSystem, file.getAbsolutePath(), writer);
-        } else {
-	    return new HDFSWriter(fileSystem, file.getAbsolutePath(), null, COMMA);
-        }	
-    }
-    
-    private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception{
-    	boolean canWrite = true;
-    	JobConf conf = DatatypeHelper.map2JobConf((HashMap)jobConfMap);
-	FileSystem fileSystem = null;
-	try {
-	    fileSystem = FileSystem.get(conf);
-	    for(FileSplit fileSplit : fileSplits) {
-		Path path = new Path(fileSplit.getLocalFile().getAbsolutePath());
-		canWrite = !fileSystem.exists(path);
-		if(!canWrite){
-	            throw new Exception(" Output path :  already exists");
-		}	
-            }
-	} catch(IOException ioe) {
-	    ioe.printStackTrace();
-	    throw ioe;
-	}
-        return canWrite;
-   }    
-
-    private static String getAbsolutePath(Path path) {
-          StringBuffer absolutePath = new StringBuffer();
-          List<String> ancestorPath = new ArrayList<String>();
-          Path pathElement=path;
-          while(pathElement != null) {
-                ancestorPath.add(0, pathElement.getName());
-             pathElement = pathElement.getParent();
-          }
-          ancestorPath.remove(0);        
-          for(String s : ancestorPath) {
-              absolutePath.append("/");
-              absolutePath.append(s);
-          }
-          return new String(absolutePath);
-     }
-
-     private static FileSplit[] getOutputSplits(JobConf conf,int noOfMappers){
-         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
-         FileSplit[] outputFileSplits = new FileSplit[numOutputters];
-         String absolutePath = getAbsolutePath(FileOutputFormat.getOutputPath(conf));
-         for(int index=0;index<numOutputters;index++) {
-             String suffix = new String("part-00000");
-             suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-             suffix = suffix + index;
-             String outputPath = absolutePath + "/" + suffix;
-             outputFileSplits[index] = new FileSplit("localhost",new File(outputPath));
-         }
-         return outputFileSplits;
-    }
-
-    public HDFSWriteOperatorDescriptor(JobSpecification jobSpec,JobConf jobConf, int numMapTasks) throws Exception{
-        super(jobSpec,getOutputSplits(jobConf,numMapTasks));
-        this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
-        checkIfCanWriteToHDFS(super.splits);
-	    this.sequenceFileOutput = 
-	    (jobConf.getOutputFormat() instanceof SequenceFileOutputFormat);
-    }
-}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index 8397d86..6ebe0b9 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -17,6 +17,9 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -32,6 +35,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
@@ -41,6 +45,11 @@
         private Reporter reporter;
         private Mapper<K1, V1, K2, V2> mapper;
         private IOpenableDataWriter<Object[]> writer;
+        private int partition;
+
+        public MapperOperator(int partition) {
+            this.partition = partition;
+        };
 
         @Override
         public void close() throws HyracksDataException {
@@ -54,20 +63,39 @@
 
         @Override
         public void open() throws HyracksDataException {
-            JobConf jobConf = getJobConf();
+            jobConf = getJobConf();
             populateCache(jobConf);
             try {
                 mapper = createMapper();
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
-            // -- - configure - --
+            if (inputSplitsProxy != null) {
+                updateConfWithSplit();
+            }
             mapper.configure(jobConf);
             writer.open();
             output = new DataWritingOutputCollector<K2, V2>(writer);
             reporter = createReporter();
         }
 
+        private void updateConfWithSplit() {
+            try {
+                InputSplit[] splits = inputSplitsProxy.toInputSplits(jobConf);
+                InputSplit splitRead = splits[partition];
+                if (splitRead instanceof FileSplit) {
+                    jobConf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
+                    jobConf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
+                    jobConf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                // we do not throw the exception here as we are setting additional parameters that may not be 
+                // required by the mapper. If they are  indeed required,  the configure method invoked on the mapper
+                // shall report an exception because of the missing parameters. 
+            }
+        }
+
         @Override
         public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
             if (index != 0) {
@@ -87,32 +115,36 @@
     }
 
     private static final long serialVersionUID = 1L;
-    private static final String mapClassNameKey = "mapred.mapper.class";
     private Class<? extends Mapper> mapperClass;
+    private InputSplitsProxy inputSplitsProxy;
 
-    public HadoopMapperOperatorDescriptor(JobSpecification spec, Class<? extends Mapper> mapperClass,
-            RecordDescriptor recordDescriptor, JobConf jobConf) {
-        super(spec, recordDescriptor, jobConf, null);
-        this.mapperClass = mapperClass;
+    private void initializeSplitInfo(InputSplit[] splits) throws IOException {
+        jobConf = super.getJobConf();
+        InputFormat inputFormat = jobConf.getInputFormat();
+        inputSplitsProxy = new InputSplitsProxy(splits);
     }
 
-    public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory) {
-        super(spec, null, jobConf, hadoopClassFactory);
+    public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, InputSplit[] splits,
+            IHadoopClassFactory hadoopClassFactory) throws IOException {
+        super(spec, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
+        if (splits != null) {
+            initializeSplitInfo(splits);
+        }
     }
 
-    public RecordDescriptor getRecordDescriptor(JobConf conf) {
+    public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory hadoopClassFactory) {
         RecordDescriptor recordDescriptor = null;
         String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
         String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
         try {
-            if (getHadoopClassFactory() == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+            if (hadoopClassFactory == null) {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                        .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
+                        .forName(mapOutputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputKeyClassName),
-                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputValueClassName));
+                        (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
+                        (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputValueClassName));
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -124,7 +156,7 @@
         if (mapperClass != null) {
             return mapperClass.newInstance();
         } else {
-            String mapperClassName = super.getJobConfMap().get(mapClassNameKey);
+            String mapperClassName = super.getJobConf().getMapperClass().getName();
             Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
             mapperClass = (Class<? extends Mapper>) mapper.getClass();
             return (Mapper) mapper;
@@ -134,8 +166,8 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
-                recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+        return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
+                .getInputRecordDescriptor(getOperatorId(), 0));
     }
 
     public Class<? extends Mapper> getMapperClass() {
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 53d1a7f..871ea25 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -20,13 +20,13 @@
 import java.util.Map;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
@@ -52,11 +52,10 @@
     private Map<String, String> jobConfMap;
     private InputSplitsProxy inputSplitsProxy;
 
-    public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec) throws IOException {
+    public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, InputSplit[] splits) throws IOException {
         super(spec, 0, 1);
         this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         InputFormat inputFormat = jobConf.getInputFormat();
-        InputSplit[] splits = inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
         RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, createReporter());
         recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
                 .createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
@@ -114,6 +113,7 @@
             public void initialize() throws HyracksDataException {
                 try {
                     JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
+                    conf.setClassLoader(this.getClass().getClassLoader());
                     RecordReader hadoopRecordReader;
                     Writable key;
                     Writable value;
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 2a70577..8a22641 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -22,11 +22,11 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IDataReader;
@@ -41,12 +41,11 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.data.KeyComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import edu.uci.ics.hyracks.dataflow.std.group.IGroupAggregator;
 import edu.uci.ics.hyracks.dataflow.std.group.DeserializedPreclusteredGroupOperator;
+import edu.uci.ics.hyracks.dataflow.std.group.IGroupAggregator;
 import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
 public class HadoopReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractHadoopOperatorDescriptor {
@@ -179,26 +178,19 @@
 
     private static final long serialVersionUID = 1L;
     private Class<? extends Reducer> reducerClass;
-    private static final String reducerClassKey = "mapred.reducer.class";
-    private static final String comparatorClassKey = "mapred.output.value.groupfn.class";
     private IComparatorFactory comparatorFactory;
 
-    public HadoopReducerOperatorDescriptor(JobSpecification spec, IComparatorFactory comparatorFactory,
-            Class<? extends Reducer> reducerClass, RecordDescriptor recordDescriptor, JobConf jobConf) {
-        super(spec, recordDescriptor, jobConf, new ClasspathBasedHadoopClassFactory());
+    public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
+            IHadoopClassFactory classFactory) {
+        super(spec, getRecordDescriptor(conf, classFactory), conf, classFactory);
         this.comparatorFactory = comparatorFactory;
-        this.reducerClass = reducerClass;
-    }
-
-    public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IHadoopClassFactory classFactory) {
-        super(spec, null, conf, classFactory);
     }
 
     private Reducer<K2, V2, K3, V3> createReducer() throws Exception {
         if (reducerClass != null) {
             return reducerClass.newInstance();
         } else {
-            Object reducer = getHadoopClassFactory().createReducer(getJobConfMap().get(reducerClassKey));
+            Object reducer = getHadoopClassFactory().createReducer(getJobConf().getReducerClass().getName());
             reducerClass = (Class<? extends Reducer>) reducer.getClass();
             return (Reducer) reducer;
         }
@@ -209,14 +201,14 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         try {
             if (this.comparatorFactory == null) {
-                String comparatorClassName = getJobConfMap().get(comparatorClassKey);
+                String comparatorClassName = getJobConf().getOutputValueGroupingComparator().getClass().getName();
                 RawComparator rawComparator = null;
                 if (comparatorClassName != null) {
                     Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
                     this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
 
                 } else {
-                    String mapOutputKeyClass = getJobConfMap().get("mapred.mapoutput.key.class");
+                    String mapOutputKeyClass = getJobConf().getMapOutputKeyClass().getName();
                     if (getHadoopClassFactory() != null) {
                         rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
                     } else {
@@ -235,28 +227,18 @@
         }
     }
 
-    public Class<? extends Reducer> getReducerClass() {
-        return reducerClass;
-    }
-
-    public void setReducerClass(Class<? extends Reducer> reducerClass) {
-        this.reducerClass = reducerClass;
-    }
-
-    @Override
-    public RecordDescriptor getRecordDescriptor(JobConf conf) {
-        String outputKeyClassName = conf.get("mapred.output.key.class");
-        String outputValueClassName = conf.get("mapred.output.value.class");
+    public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
+        String outputKeyClassName = conf.getOutputKeyClass().getName();
+        String outputValueClassName = conf.getOutputValueClass().getName();
         RecordDescriptor recordDescriptor = null;
         try {
-            if (getHadoopClassFactory() == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(outputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(outputValueClassName));
+            if (classFactory == null) {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                        .forName(outputKeyClassName), (Class<? extends Writable>) Class.forName(outputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputKeyClassName),
-                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputValueClassName));
+                        (Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
+                        (Class<? extends Writable>) classFactory.loadClass(outputValueClassName));
             }
         } catch (Exception e) {
             e.printStackTrace();
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
new file mode 100644
index 0000000..090bca1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IRecordWriter;
+import edu.uci.ics.hyracks.dataflow.std.file.RecordWriter;
+
+public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
+
+    private static String nullWritableClassName = NullWritable.class.getName();
+
+    private static class HDFSWriter extends RecordWriter {
+
+        HDFSWriter(FileSystem fileSystem, String hdfsPath, int[] columns, char separator) throws Exception {
+            super(columns, separator, new Object[] { fileSystem, hdfsPath });
+        }
+
+        @Override
+        public OutputStream createOutputStream(Object[] args) throws Exception {
+            FSDataOutputStream fs = ((FileSystem) args[0]).create(new Path((String) args[1]));
+            return fs;
+        }
+
+        @Override
+        public void write(Object[] record) throws Exception {
+            if (!nullWritableClassName.equals((record[0].getClass().getName()))) {
+                bufferedWriter.write(String.valueOf(record[0]));
+            }
+            if (!nullWritableClassName.equals((record[1].getClass().getName()))) {
+                bufferedWriter.write(separator);
+                bufferedWriter.write(String.valueOf(record[1]));
+            }
+            bufferedWriter.write("\n");
+        }
+    }
+
+    private static class HDFSSequenceWriter extends RecordWriter {
+        private Writer writer;
+
+        HDFSSequenceWriter(FileSystem fileSystem, String hdfsPath, Writer writer) throws Exception {
+            super(null, COMMA, new Object[] { fileSystem, hdfsPath });
+            this.writer = writer;
+        }
+
+        @Override
+        public OutputStream createOutputStream(Object[] args) throws Exception {
+            return null;
+        }
+
+        @Override
+        public void close() {
+            try {
+                writer.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void write(Object[] record) throws Exception {
+            Object key = record[0];
+            Object value = record[1];
+            writer.append(key, value);
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+    private static final char COMMA = ',';
+    private boolean sequenceFileOutput = false;
+    Map<String, String> jobConfMap;
+
+    @Override
+    protected IRecordWriter createRecordWriter(File file, int index) throws Exception {
+        JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
+        conf.setClassLoader(this.getClass().getClassLoader());
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = FileSystem.get(conf);
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+        Path path = new Path(file.getPath());
+        checkIfCanWriteToHDFS(new FileSplit[] { new FileSplit("localhost", file) });
+        FSDataOutputStream outputStream = fileSystem.create(path);
+        outputStream.close();
+        if (sequenceFileOutput) {
+            Class keyClass = Class.forName(conf.getOutputKeyClass().getName());
+            Class valueClass = Class.forName(conf.getOutputValueClass().getName());
+            conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
+            Writer writer = SequenceFile.createWriter(fileSystem, conf, path, keyClass, valueClass);
+            return new HDFSSequenceWriter(fileSystem, file.getAbsolutePath(), writer);
+        } else {
+            return new HDFSWriter(fileSystem, file.getPath(), null, COMMA);
+        }
+    }
+
+    private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
+        boolean canWrite = true;
+        JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = FileSystem.get(conf);
+            for (FileSplit fileSplit : fileSplits) {
+                Path path = new Path(fileSplit.getLocalFile().getPath());
+                canWrite = !fileSystem.exists(path);
+                if (!canWrite) {
+                    throw new Exception(" Output path :  already exists : " + path);
+                }
+            }
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+            throw ioe;
+        }
+        return canWrite;
+    }
+
+    private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) {
+        int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
+        FileSplit[] outputFileSplits = new FileSplit[numOutputters];
+        String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
+        for (int index = 0; index < numOutputters; index++) {
+            String suffix = new String("part-00000");
+            suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+            suffix = suffix + index;
+            String outputPath = absolutePath + "/" + suffix;
+            outputFileSplits[index] = new FileSplit("localhost", new File(outputPath));
+        }
+        return outputFileSplits;
+    }
+
+    public HadoopWriteOperatorDescriptor(JobSpecification jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
+        super(jobSpec, getOutputSplits(jobConf, numMapTasks));
+        this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
+        checkIfCanWriteToHDFS(super.splits);
+        this.sequenceFileOutput = (jobConf.getOutputFormat() instanceof SequenceFileOutputFormat);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-hadoop/target/maven-archiver/pom.properties b/hyracks/hyracks-dataflow-hadoop/target/maven-archiver/pom.properties
new file mode 100644
index 0000000..1c664bc
--- /dev/null
+++ b/hyracks/hyracks-dataflow-hadoop/target/maven-archiver/pom.properties
@@ -0,0 +1,5 @@
+#Generated by Maven
+#Tue Oct 12 17:32:14 PDT 2010
+version=0.1.3-SNAPSHOT
+groupId=edu.uci.ics.hyracks
+artifactId=hyracks-dataflow-hadoop