Merged -r 438:524 from trunk into branch

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@525 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
index dea84dd..0272d6e 100644
--- a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
@@ -1,5 +1,15 @@
-#Fri May 20 19:34:05 PDT 2011
+#Thu Jun 02 13:11:16 PDT 2011
 eclipse.preferences.version=1
+org.eclipse.jdt.core.codeComplete.argumentPrefixes=
+org.eclipse.jdt.core.codeComplete.argumentSuffixes=
+org.eclipse.jdt.core.codeComplete.fieldPrefixes=
+org.eclipse.jdt.core.codeComplete.fieldSuffixes=
+org.eclipse.jdt.core.codeComplete.localPrefixes=
+org.eclipse.jdt.core.codeComplete.localSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFieldSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldSuffixes=
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
 org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
@@ -14,7 +24,7 @@
 org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
 org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
 org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=49
+org.eclipse.jdt.core.formatter.alignment_for_enum_constants=48
 org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
 org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
 org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
@@ -48,18 +58,18 @@
 org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
 org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
 org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=true
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
+org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
+org.eclipse.jdt.core.formatter.comment.format_block_comments=true
 org.eclipse.jdt.core.formatter.comment.format_header=false
 org.eclipse.jdt.core.formatter.comment.format_html=true
 org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=true
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
+org.eclipse.jdt.core.formatter.comment.format_line_comments=true
 org.eclipse.jdt.core.formatter.comment.format_source_code=true
 org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
 org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
 org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
 org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=9999
+org.eclipse.jdt.core.formatter.comment.line_length=80
 org.eclipse.jdt.core.formatter.compact_else_if=true
 org.eclipse.jdt.core.formatter.continuation_indentation=2
 org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
@@ -246,7 +256,7 @@
 org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
 org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
 org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=false
+org.eclipse.jdt.core.formatter.join_lines_in_comments=true
 org.eclipse.jdt.core.formatter.join_wrapped_lines=true
 org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
 org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
@@ -257,7 +267,7 @@
 org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
 org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
 org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=false
+org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
 org.eclipse.jdt.core.formatter.tabulation.char=space
 org.eclipse.jdt.core.formatter.tabulation.size=4
 org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
diff --git a/hyracks-dataflow-hadoop/pom.xml b/hyracks-dataflow-hadoop/pom.xml
index 0938b30..250c1af 100644
--- a/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks-dataflow-hadoop/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-hadoop</artifactId>
-  <version>0.1.5</version>
+  <version>0.1.7-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.5</version>
+    <version>0.1.7-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,14 +27,14 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.5</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.5</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
@@ -54,7 +54,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.5</version>
+  		<version>0.1.7-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
   </dependencies>
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index c3167cb..fd549b3 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -71,16 +71,8 @@
             Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             jobConf = getJobConf();
             populateCache(jobConf);
-            try {
-                mapper = createMapper();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
             conf = new JobConf(jobConf);
             conf.setClassLoader(jobConf.getClassLoader());
-            if (!jobConf.getUseNewMapper()) {
-                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
-            }
             reporter = createReporter();
         }
 
@@ -143,6 +135,19 @@
             }
             this.writer = writer;
         }
+
+        protected void initializeMapper() throws HyracksDataException {
+            super.initializeMapper();
+            try {
+                mapper = createMapper(conf);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            if (!conf.getUseNewMapper()) {
+                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+            }
+        }
+
     }
 
     private class ReaderMapperOperator extends MapperBaseOperator {
@@ -167,10 +172,10 @@
                 } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
                     conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
                             .toString());
-                    conf.setLong("map.input.start",
-                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
-                    conf.setLong("map.input.length",
-                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
+                    conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+                            .getStart());
+                    conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+                            .getLength());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -180,10 +185,22 @@
             }
         }
 
+        protected void initializeMapper() throws HyracksDataException {
+            super.initializeMapper();
+            updateConfWithSplit(conf);
+            try {
+                mapper = createMapper(conf);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            if (!conf.getUseNewMapper()) {
+                ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+            }
+        }
+
         public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
             try {
                 initializeMapper();
-                updateConfWithSplit(conf);
                 conf.setClassLoader(this.getClass().getClassLoader());
                 Object reader;
                 Object key = null;
@@ -209,7 +226,7 @@
                     };;;
 
                     OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
-                            .getOutputCommitter(new TaskAttemptContext(jobConf, new TaskAttemptID()));
+                            .getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
                     StatusReporter statusReporter = new StatusReporter() {
                         @Override
                         public void setStatus(String arg0) {
@@ -229,7 +246,7 @@
                             return null;
                         }
                     };;;
-                    context = new org.apache.hadoop.mapreduce.Mapper().new Context(jobConf, new TaskAttemptID(),
+                    context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
                             newReader, recordWriter, outputCommitter, statusReporter,
                             (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
                     newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context);
@@ -298,9 +315,9 @@
         String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
         try {
             if (hadoopClassFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                        .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
+                        .forName(mapOutputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                         (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -312,21 +329,21 @@
         return recordDescriptor;
     }
 
-    private Object createMapper() throws Exception {
+    private Object createMapper(JobConf conf) throws Exception {
         Object mapper;
         if (mapperClass != null) {
-            return ReflectionUtils.newInstance(mapperClass, jobConf);
+            return ReflectionUtils.newInstance(mapperClass, conf);
         } else {
             String mapperClassName = null;
             if (jobConf.getUseNewMapper()) {
-                JobContext jobContext = new JobContext(jobConf, null);
+                JobContext jobContext = new JobContext(conf, null);
                 mapperClass = jobContext.getMapperClass();
                 mapperClassName = mapperClass.getName();
             } else {
-                mapperClass = super.getJobConf().getMapperClass();
+                mapperClass = conf.getMapperClass();
                 mapperClassName = mapperClass.getName();
             }
-            mapper = getHadoopClassFactory().createMapper(mapperClassName, jobConf);
+            mapper = getHadoopClassFactory().createMapper(mapperClassName, conf);
         }
         return mapper;
     }
@@ -344,8 +361,8 @@
         } else {
             Class inputFormatClass = conf.getInputFormat().getClass();
             InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
-                    super.createReporter());
+            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
+                    .createReporter());
         }
     }
 
@@ -383,8 +400,8 @@
                 }
                 return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
             } else {
-                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
-                        recordDescProvider.getInputRecordDescriptor(this.odId, 0));
+                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
+                        .getInputRecordDescriptor(this.odId, 0));
             }
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 2d44c79..5857f36 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -23,13 +23,14 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -310,11 +311,13 @@
     private static final long serialVersionUID = 1L;
     private Class reducerClass;
     private IComparatorFactory comparatorFactory;
+    private boolean useAsCombiner = false;
 
     public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
-            IHadoopClassFactory classFactory) {
+            IHadoopClassFactory classFactory, boolean useAsCombiner) {
         super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
         this.comparatorFactory = comparatorFactory;
+        this.useAsCombiner = useAsCombiner;
     }
 
     private Object createReducer() throws Exception {
@@ -322,12 +325,22 @@
             return ReflectionUtils.newInstance(reducerClass, getJobConf());
         } else {
             Object reducer;
-            if (getJobConf().getUseNewReducer()) {
-                JobContext jobContext = new JobContext(getJobConf(), null);
-                reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
-                        .getReducerClass();
+            if (!useAsCombiner) {
+                if (getJobConf().getUseNewReducer()) {
+                    JobContext jobContext = new JobContext(getJobConf(), null);
+                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+                            .getReducerClass();
+                } else {
+                    reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+                }
             } else {
-                reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+                if (getJobConf().getUseNewReducer()) {
+                    JobContext jobContext = new JobContext(getJobConf(), null);
+                    reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+                            .getCombinerClass();
+                } else {
+                    reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
+                }
             }
             reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
             return reducer;
@@ -382,9 +395,8 @@
         RecordDescriptor recordDescriptor = null;
         try {
             if (classFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
-                        (Class<? extends Writable>) Class.forName(outputKeyClassName),
-                        (Class<? extends Writable>) Class.forName(outputValueClassName));
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+                        .forName(outputKeyClassName), (Class<? extends Writable>) Class.forName(outputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                         (Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 4dcfa61..2154f5a 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -21,14 +21,13 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
@@ -49,63 +48,81 @@
         JobConf conf;
         Path finalOutputFile;
         Path tempOutputFile;
+        Path tempDir;
 
-        HadoopFileWriter(Object recordWriter, Path tempOutputFile, Path outputFile, JobConf conf) {
+        HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
             this.recordWriter = recordWriter;
             this.conf = conf;
-            this.finalOutputFile = outputFile;
-            this.tempOutputFile = tempOutputFile;
+            initialize(index, conf);
+        }
+
+        private void initialize(int index, JobConf conf) throws Exception {
+            if (!(conf.getOutputFormat() instanceof NullOutputFormat)) {
+                boolean isMap = conf.getNumReduceTasks() == 0;
+                TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index);
+                conf.set("mapred.task.id", taskAttempId.toString());
+                String suffix = new String("part-00000");
+                suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+                suffix = suffix + index;
+                outputPath = new Path(conf.get("mapred.output.dir"));
+                tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+                FileSystem fileSys = tempDir.getFileSystem(conf);
+                if (!fileSys.mkdirs(tempDir)) {
+                    throw new IOException("Mkdirs failed to create " + tempDir.toString());
+                }
+                tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString()));
+                tempOutputFile = new Path(tempOutputFile, suffix);
+                finalOutputFile = new Path(outputPath, suffix);
+                if (conf.getUseNewMapper()) {
+                    org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(
+                            conf, null);
+                    org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
+                            .newInstance(jobContext.getOutputFormatClass(), conf);
+                    recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+                } else {
+                    recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix,
+                            new Progressable() {
+                                @Override
+                                public void progress() {
+                                }
+                            });
+                }
+            }
         }
 
         @Override
         public void write(Object[] record) throws Exception {
-            if (conf.getUseNewMapper()) {
-                ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
-            } else {
-                ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
+            if (recordWriter != null) {
+                if (conf.getUseNewMapper()) {
+                    ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
+                } else {
+                    ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
+                }
             }
         }
 
         @Override
         public void close() {
             try {
-                if (conf.getUseNewMapper()) {
-                    ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
-                            new TaskAttemptID()));
-                } else {
-                    ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
+                if (recordWriter != null) {
+                    if (conf.getUseNewMapper()) {
+                        ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
+                                new TaskAttemptID()));
+                    } else {
+                        ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
+                    }
+                    if (outputPath != null) {
+                        FileSystem fileSystem = FileSystem.get(conf);
+                        fileSystem.rename(tempOutputFile, finalOutputFile);
+                        fileSystem.delete(tempDir, true);
+                    }
                 }
-                FileSystem.get(conf).rename(tempOutputFile, finalOutputFile);
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
     }
 
-    private static class HadoopSequenceWriter implements IRecordWriter {
-        private Writer writer;
-
-        HadoopSequenceWriter(Writer writer) throws Exception {
-            this.writer = writer;
-        }
-
-        @Override
-        public void close() {
-            try {
-                writer.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void write(Object[] record) throws Exception {
-            Object key = record[0];
-            Object value = record[1];
-            writer.append(key, value);
-        }
-    }
-
     private static final long serialVersionUID = 1L;
     Map<String, String> jobConfMap;
 
@@ -114,46 +131,9 @@
         JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
         conf.setClassLoader(this.getClass().getClassLoader());
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-        FileSystem fileSystem = null;
-        try {
-            fileSystem = FileSystem.get(conf);
-        } catch (IOException ioe) {
-            ioe.printStackTrace();
-        }
-        Path path = new Path(fileSplit.getLocalFile().getFile().getPath());
-        Path tempOutputFile = null;
-        Path finalOutputFile = null;
-        checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
+        FileSystem fileSystem = FileSystem.get(conf);
         Object recordWriter = null;
-        Object outputFormat = null;
-        String taskAttempId = new TaskAttemptID().toString();
-        conf.set("mapred.task.id", taskAttempId);
-        outputPath = new Path(conf.get("mapred.output.dir"));
-        outputTempPath = new Path(outputPath, "_temporary");
-        if (outputPath != null && !fileSystem.exists(outputPath)) {
-            fileSystem.mkdirs(outputTempPath);
-        }
-        String suffix = new String("part-r-00000");
-        suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-        suffix = suffix + index;
-        tempOutputFile = new Path(outputTempPath, "_" + taskAttempId + "/" + suffix);
-        if (conf.getNumReduceTasks() == 0) {
-            suffix.replace("-r-", "-m-");
-        }
-        finalOutputFile = new Path(outputPath, suffix);
-        if (conf.getUseNewMapper()) {
-            org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
-                    .newInstance((new JobContext(conf, null)).getOutputFormatClass(), conf);
-            recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
-        } else {
-            recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf, suffix, new Progressable() {
-                @Override
-                public void progress() {
-                }
-            });
-        }
-
-        return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
+        return new HadoopFileWriter(recordWriter, index, conf);
     }
 
     Path outputPath;
@@ -219,7 +199,8 @@
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
         Object outputFormat = null;
         if (conf.getUseNewMapper()) {
-            outputFormat = ReflectionUtils.newInstance(new JobContext(conf, null).getOutputFormatClass(), conf);
+            outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf, null)
+                    .getOutputFormatClass(), conf);
         } else {
             outputFormat = conf.getOutputFormat();
         }
@@ -234,18 +215,15 @@
 
             FileSplit[] outputFileSplits = new FileSplit[numOutputters];
             String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
-            System.out.println("absolute path:" + absolutePath);
             for (int index = 0; index < numOutputters; index++) {
                 String suffix = new String("part-00000");
                 suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
                 suffix = suffix + index;
                 String outputPath = absolutePath + "/" + suffix;
-                System.out.println("output path :" + outputPath);
                 outputFileSplits[index] = new FileSplit("localhost", outputPath);
             }
             return outputFileSplits;
         }
-
     }
 
     public HadoopWriteOperatorDescriptor(JobSpecification jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
index f4dbe4a..a9f3a71 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -17,19 +17,18 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
-
 public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
 
     @Override
-    public Object createMapper(String mapClassName,JobConf conf) throws Exception {
+    public Object createMapper(String mapClassName, JobConf conf) throws Exception {
         Class clazz = loadClass(mapClassName);
         return ReflectionUtils.newInstance(clazz, conf);
     }
 
     @Override
-    public Object createReducer(String reduceClassName,JobConf conf) throws Exception {
+    public Object createReducer(String reduceClassName, JobConf conf) throws Exception {
         Class clazz = loadClass(reduceClassName);
-        return  ReflectionUtils.newInstance(clazz, conf);
+        return ReflectionUtils.newInstance(clazz, conf);
     }
 
     @Override
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
index 22db448..c2ffeeb 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -20,9 +20,9 @@
 
 public interface IHadoopClassFactory extends Serializable {
 
-    public Object createMapper(String mapClassName,JobConf conf) throws Exception;
+    public Object createMapper(String mapClassName, JobConf conf) throws Exception;
 
-    public Object createReducer(String reduceClassName,JobConf conf) throws Exception;
+    public Object createReducer(String reduceClassName, JobConf conf) throws Exception;
 
     public Class loadClass(String className) throws Exception;
 }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index 29239e3..9226c29 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -36,15 +36,15 @@
         isClasses = new Class[inputSplits.length];
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(baos);
-        if(conf.getUseNewMapper()){
+        if (conf.getUseNewMapper()) {
             for (int i = 0; i < inputSplits.length; ++i) {
-                isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit)inputSplits[i]).getClass();
-                ((Writable)inputSplits[i]).write(dos);
+                isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit) inputSplits[i]).getClass();
+                ((Writable) inputSplits[i]).write(dos);
             }
         } else {
             for (int i = 0; i < inputSplits.length; ++i) {
-                isClasses[i] = ((org.apache.hadoop.mapred.InputSplit)inputSplits[i]).getClass();
-                ((Writable)inputSplits[i]).write(dos);
+                isClasses[i] = ((org.apache.hadoop.mapred.InputSplit) inputSplits[i]).getClass();
+                ((Writable) inputSplits[i]).write(dos);
             }
         }
         dos.close();
@@ -52,17 +52,16 @@
 
     }
 
-    public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
-            IOException {
+    public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException, IOException {
         Object[] splits = new Object[isClasses.length];
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
         for (int i = 0; i < splits.length; ++i) {
             splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
-            if(jobConf.getUseNewMapper()){
-                ((Writable)splits[i]).readFields(dis);
-            }else {
-                ((Writable)splits[i]).readFields(dis);
-            }    
+            if (jobConf.getUseNewMapper()) {
+                ((Writable) splits[i]).readFields(dis);
+            } else {
+                ((Writable) splits[i]).readFields(dis);
+            }
         }
         return splits;
     }