Modified HadoopWriterOperatorDescriptor. The operator previously used to create Sequence files by opening FSDataOutputStream. Though this results in correct creation of the sequence file,  it is still better to have it done by obtaining a SequenceFile writer from the outputFormat.

git-svn-id: https://hyracks.googlecode.com/svn/trunk@202 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index d9b41ce..95a4647 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -16,18 +16,13 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -41,23 +36,18 @@
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IRecordWriter;
-import edu.uci.ics.hyracks.dataflow.std.file.RecordWriter;
 
 public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
 
-    private static String nullWritableClassName = NullWritable.class.getName();
-
     private  class HadoopFileWriter implements IRecordWriter {
 
         Object recordWriter;
         JobConf conf;
-        final boolean useNewMapReduceLib;
         Path finalOutputFile;
         Path tempOutputFile;
       
@@ -67,12 +57,11 @@
             this.conf = conf;
             this.finalOutputFile = outputFile;
             this.tempOutputFile = tempOutputFile;
-            useNewMapReduceLib = conf.getUseNewMapper();
         }
 
         @Override
         public void write(Object[] record) throws Exception {
-            if (useNewMapReduceLib){
+            if (conf.getUseNewMapper()){
                 ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
             } else {
                 ((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
@@ -82,12 +71,12 @@
         @Override
         public void close() {
             try {
-                if (useNewMapReduceLib){
+                if (conf.getUseNewMapper()){
                    ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
                 } else {
                     ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
-                    FileSystem.get(conf).rename( tempOutputFile, finalOutputFile);
                 }
+                FileSystem.get(conf).rename( tempOutputFile, finalOutputFile);
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -137,46 +126,34 @@
         Path finalOutputFile = null;
         checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
         Object recordWriter  = null;
-        boolean sequenceFileOutput = false;
         Object outputFormat = null;
+        String taskAttempId = new TaskAttemptID().toString();
+        conf.set("mapred.task.id",taskAttempId);
+        outputPath = new Path(conf.get("mapred.output.dir"));
+        outputTempPath = new Path(outputPath,"_temporary");
+        if(outputPath != null && !fileSystem.exists(outputPath)) {
+            fileSystem.mkdirs(outputTempPath);
+        }
+        String suffix =  new String("part-r-00000");
+        suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+        suffix = suffix + index;
+        tempOutputFile = new Path(outputTempPath,"_" + taskAttempId + "/" + suffix);
+        if (conf.getNumReduceTasks() == 0 ) {
+            suffix.replace("-r-", "-m-");
+        }
+        finalOutputFile = new Path(outputPath,suffix);
         if(conf.getUseNewMapper()){
             org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
-            if(newOutputFormat instanceof org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat) {
-                 sequenceFileOutput = true;
-            }else{
                 recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
-            }    
         }else {
-            if(conf.getOutputFormat() instanceof SequenceFileOutputFormat) {
-                sequenceFileOutput = true;
-            } else {
-                String taskAttempId = new TaskAttemptID().toString();
-                conf.set("mapred.task.id",taskAttempId);
-                outputPath = new Path(conf.get("mapred.output.dir"));
-                outputTempPath = new Path(outputPath,"_temporary");
-                if(outputPath != null && !fileSystem.exists(outputPath)) {
-                    fileSystem.mkdirs(outputTempPath);
-                }
-                String suffix = new String("part-00000");
-                suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
-                suffix = suffix + index;
-                tempOutputFile = new Path(outputTempPath,"_" + taskAttempId + "/" + suffix);
-                finalOutputFile = new Path(outputPath,suffix);
-                recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf,suffix, new Progressable() {
-                @Override
-                public void progress() {}
-                });
-            }
+           recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf,suffix, new Progressable() {
+           @Override
+           public void progress() {}
+           });
         }
-        if(!sequenceFileOutput) {
-            return new HadoopFileWriter(recordWriter,tempOutputFile,finalOutputFile,conf);
-        } else { 
-            Class keyClass = Class.forName(conf.getOutputKeyClass().getName());
-            Class valueClass = Class.forName(conf.getOutputValueClass().getName());
-            conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
-            Writer writer = SequenceFile.createWriter(fileSystem, conf, path, keyClass, valueClass);
-            return new HadoopSequenceWriter(writer);    
-        }   
+        
+        
+        return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
     }