refactored code in HadoopWriteOperatorDescriptor
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_hadoop_compat_changes@475 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 9f3d532..30584b7 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -28,11 +28,11 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.FileOutputCommitter;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -49,15 +49,45 @@
JobConf conf;
Path finalOutputFile;
Path tempOutputFile;
-
+ Path tempDir;
- HadoopFileWriter(Object recordWriter,Path tempOutputFile,Path outputFile,JobConf conf) {
+ HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
this.recordWriter = recordWriter;
this.conf = conf;
- this.finalOutputFile = outputFile;
- this.tempOutputFile = tempOutputFile;
+ initialize(index, conf);
}
+
+ private void initialize(int index, JobConf conf) throws Exception {
+ if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
+ boolean isMap = conf.getNumReduceTasks() == 0;
+ TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
+ conf.set("mapred.task.id",taskAttempId.toString());
+ String suffix = new String("part-00000");
+ suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+ suffix = suffix + index;
+ outputPath = new Path(conf.get("mapred.output.dir"));
+ tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tempDir.getFileSystem(conf);
+ if (!fileSys.mkdirs(tempDir)) {
+ throw new IOException("Mkdirs failed to create " + tempDir.toString());
+ }
+ tempOutputFile = new Path(tempDir,new Path("_" + taskAttempId.toString()));
+ tempOutputFile = new Path(tempOutputFile,suffix);
+ finalOutputFile = new Path(outputPath,suffix);
+ if(conf.getUseNewMapper()){
+ org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(conf,null);
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance(jobContext.getOutputFormatClass(),conf);
+ recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+ }else {
+ recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf,suffix, new Progressable() {
+ @Override
+ public void progress() {}
+ });
+ }
+ }
+ }
+
@Override
public void write(Object[] record) throws Exception {
if(recordWriter != null){
@@ -78,7 +108,11 @@
} else {
((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
}
- FileSystem.get(conf).rename( tempOutputFile, finalOutputFile);
+ if (outputPath != null) {
+ FileSystem fileSystem = FileSystem.get(conf);
+ fileSystem.rename(tempOutputFile, finalOutputFile);
+ fileSystem.delete(tempDir,true);
+ }
}
} catch (Exception e) {
e.printStackTrace();
@@ -86,30 +120,6 @@
}
}
- private static class HadoopSequenceWriter implements IRecordWriter {
- private Writer writer;
-
- HadoopSequenceWriter(Writer writer) throws Exception {
- this.writer = writer;
- }
-
- @Override
- public void close() {
- try {
- writer.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void write(Object[] record) throws Exception {
- Object key = record[0];
- Object value = record[1];
- writer.append(key, value);
- }
- }
-
private static final long serialVersionUID = 1L;
Map<String, String> jobConfMap;
@@ -118,43 +128,10 @@
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
conf.setClassLoader(this.getClass().getClassLoader());
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- FileSystem fileSystem = null;
- try {
- fileSystem = FileSystem.get(conf);
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
- Path tempOutputFile = null;
- Path finalOutputFile = null;
+ FileSystem fileSystem = FileSystem.get(conf);
checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
Object recordWriter = null;
- if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
- boolean isMap = conf.getNumReduceTasks() == 0;
- TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
- conf.set("mapred.task.id",taskAttempId.toString());
- outputPath = new Path(conf.get("mapred.output.dir"));
- outputTempPath = new Path(outputPath,"_temporary");
- String suffix = new String("part-r-00000");
- suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
- suffix = suffix + index;
- tempOutputFile = new Path(outputTempPath,new Path("_" + taskAttempId.toString()));
- if (conf.getNumReduceTasks() == 0 ) {
- suffix=suffix.replace("-r-", "-m-");
- }
- tempOutputFile = new Path(tempOutputFile,suffix);
- finalOutputFile = new Path(outputPath,suffix);
- if(conf.getUseNewMapper()){
- org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
- recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
- }else {
- recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf,suffix, new Progressable() {
- @Override
- public void progress() {}
- });
- }
- }
-
- return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
+ return new HadoopFileWriter(recordWriter, index, conf);
}
@@ -221,7 +198,7 @@
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
Object outputFormat = null;
if(conf.getUseNewMapper()) {
- outputFormat = ReflectionUtils.newInstance(new JobContext(conf,null).getOutputFormatClass(), conf);
+ outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf,null).getOutputFormatClass(), conf);
} else {
outputFormat = conf.getOutputFormat();
}
@@ -252,6 +229,7 @@
super(jobSpec, getOutputSplits(jobConf, numMapTasks));
this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
checkIfCanWriteToHDFS(super.splits);
+ /*
FileSystem fs = FileSystem.get(jobConf);
if (jobConf.get("mapred.output.dir") != null) {
Path output = new Path(jobConf.get("mapred.output.dir"));
@@ -261,5 +239,6 @@
}
}
}
+ */
}
-
+}