refactored code in HadoopWriteOperatorDescriptor

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_hadoop_compat_changes@475 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 9f3d532..30584b7 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -28,11 +28,11 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.FileOutputCommitter;
 
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -49,15 +49,45 @@
         JobConf conf;
         Path finalOutputFile;
         Path tempOutputFile;
-      
+        Path tempDir; 
         
-        HadoopFileWriter(Object recordWriter,Path tempOutputFile,Path outputFile,JobConf conf) {
+        HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
             this.recordWriter = recordWriter;
             this.conf = conf;
-            this.finalOutputFile = outputFile;
-            this.tempOutputFile = tempOutputFile;
+	    initialize(index, conf);
         }
 
+
+        private void initialize(int index, JobConf conf) throws Exception {
+            if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
+                boolean isMap = conf.getNumReduceTasks() == 0;
+                TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
+                conf.set("mapred.task.id",taskAttempId.toString());
+                String suffix =  new String("part-00000");
+                suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+                suffix = suffix + index;
+                outputPath = new Path(conf.get("mapred.output.dir"));
+	        tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+	        FileSystem fileSys = tempDir.getFileSystem(conf);
+	        if (!fileSys.mkdirs(tempDir)) {
+	           throw new IOException("Mkdirs failed to create " + tempDir.toString());
+	        }
+                tempOutputFile = new Path(tempDir,new Path("_" + taskAttempId.toString()));
+                tempOutputFile = new Path(tempOutputFile,suffix);
+                finalOutputFile = new Path(outputPath,suffix);
+                if(conf.getUseNewMapper()){
+                    org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(conf,null);
+                    org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance(jobContext.getOutputFormatClass(),conf);
+                    recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+                }else {
+                    recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf,suffix, new Progressable() {
+                    @Override
+                    public void progress() {}
+                    });
+                }
+	    }
+	}
+
         @Override
         public void write(Object[] record) throws Exception {
             if(recordWriter != null){
@@ -78,7 +108,11 @@
                     } else {
                         ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
                     }
-                    FileSystem.get(conf).rename( tempOutputFile, finalOutputFile);
+	            if (outputPath != null) {
+		        FileSystem fileSystem = FileSystem.get(conf);
+                        fileSystem.rename(tempOutputFile, finalOutputFile);
+                        fileSystem.delete(tempDir,true);
+		    }	
                 } 
             } catch (Exception e) {
                 e.printStackTrace();
@@ -86,30 +120,6 @@
         }
     }
 
-    private static class HadoopSequenceWriter implements IRecordWriter {
-        private Writer writer;
-
-        HadoopSequenceWriter(Writer writer) throws Exception {
-            this.writer = writer;
-        }
-
-        @Override
-        public void close() {
-            try {
-                writer.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void write(Object[] record) throws Exception {
-            Object key = record[0];
-            Object value = record[1];
-            writer.append(key, value);
-        }
-    }
-
     private static final long serialVersionUID = 1L;
     Map<String, String> jobConfMap;
 
@@ -118,43 +128,10 @@
         JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
         conf.setClassLoader(this.getClass().getClassLoader());
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-        FileSystem fileSystem = null;
-        try {
-            fileSystem = FileSystem.get(conf);
-        } catch (IOException ioe) {
-            ioe.printStackTrace();
-        }
-        Path tempOutputFile = null;
-        Path finalOutputFile = null;
+        FileSystem fileSystem = FileSystem.get(conf);
         checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
         Object recordWriter  = null;
-        if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
-            boolean isMap = conf.getNumReduceTasks() == 0;
-            TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
-            conf.set("mapred.task.id",taskAttempId.toString());
-            outputPath = new Path(conf.get("mapred.output.dir"));
-            outputTempPath = new Path(outputPath,"_temporary");
-            String suffix =  new String("part-r-00000");
-            suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-            suffix = suffix + index;
-            tempOutputFile = new Path(outputTempPath,new Path("_" + taskAttempId.toString()));
-            if (conf.getNumReduceTasks() == 0 ) {
-                suffix=suffix.replace("-r-", "-m-");
-            }
-            tempOutputFile = new Path(tempOutputFile,suffix);
-            finalOutputFile = new Path(outputPath,suffix);
-            if(conf.getUseNewMapper()){
-                org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
-                    recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
-            }else {
-               recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf,suffix, new Progressable() {
-               @Override
-               public void progress() {}
-               });
-            }
-        }
-        
-        return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
+        return new HadoopFileWriter(recordWriter, index, conf);
     }
     
 
@@ -221,7 +198,7 @@
         int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
         Object outputFormat = null;
         if(conf.getUseNewMapper()) {
-            outputFormat = ReflectionUtils.newInstance(new JobContext(conf,null).getOutputFormatClass(), conf);
+            outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf,null).getOutputFormatClass(), conf);
         } else {
             outputFormat = conf.getOutputFormat();
         }
@@ -252,6 +229,7 @@
         super(jobSpec, getOutputSplits(jobConf, numMapTasks));
         this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
         checkIfCanWriteToHDFS(super.splits);
+	/*
         FileSystem fs = FileSystem.get(jobConf);
         if (jobConf.get("mapred.output.dir") != null) {
             Path output = new Path(jobConf.get("mapred.output.dir"));
@@ -261,5 +239,6 @@
                 }
             }
         }
+       */
     }
-
+}