Merged -r 438:524 from trunk into branch
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@525 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
index dea84dd..0272d6e 100644
--- a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
@@ -1,5 +1,15 @@
-#Fri May 20 19:34:05 PDT 2011
+#Thu Jun 02 13:11:16 PDT 2011
eclipse.preferences.version=1
+org.eclipse.jdt.core.codeComplete.argumentPrefixes=
+org.eclipse.jdt.core.codeComplete.argumentSuffixes=
+org.eclipse.jdt.core.codeComplete.fieldPrefixes=
+org.eclipse.jdt.core.codeComplete.fieldSuffixes=
+org.eclipse.jdt.core.codeComplete.localPrefixes=
+org.eclipse.jdt.core.codeComplete.localSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFieldSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldSuffixes=
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
@@ -14,7 +24,7 @@
org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=49
+org.eclipse.jdt.core.formatter.alignment_for_enum_constants=48
org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
@@ -48,18 +58,18 @@
org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=true
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
+org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
+org.eclipse.jdt.core.formatter.comment.format_block_comments=true
org.eclipse.jdt.core.formatter.comment.format_header=false
org.eclipse.jdt.core.formatter.comment.format_html=true
org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=true
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
+org.eclipse.jdt.core.formatter.comment.format_line_comments=true
org.eclipse.jdt.core.formatter.comment.format_source_code=true
org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=9999
+org.eclipse.jdt.core.formatter.comment.line_length=80
org.eclipse.jdt.core.formatter.compact_else_if=true
org.eclipse.jdt.core.formatter.continuation_indentation=2
org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
@@ -246,7 +256,7 @@
org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=false
+org.eclipse.jdt.core.formatter.join_lines_in_comments=true
org.eclipse.jdt.core.formatter.join_wrapped_lines=true
org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
@@ -257,7 +267,7 @@
org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=false
+org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
org.eclipse.jdt.core.formatter.tabulation.char=space
org.eclipse.jdt.core.formatter.tabulation.size=4
org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
diff --git a/hyracks-dataflow-hadoop/pom.xml b/hyracks-dataflow-hadoop/pom.xml
index 0938b30..250c1af 100644
--- a/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks-dataflow-hadoop/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-hadoop</artifactId>
- <version>0.1.5</version>
+ <version>0.1.7-SNAPSHOT</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.5</version>
+ <version>0.1.7-SNAPSHOT</version>
</parent>
<build>
@@ -27,14 +27,14 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.1.5</version>
+ <version>0.1.7-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.1.5</version>
+ <version>0.1.7-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -54,7 +54,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.1.5</version>
+ <version>0.1.7-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index c3167cb..fd549b3 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -71,16 +71,8 @@
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
jobConf = getJobConf();
populateCache(jobConf);
- try {
- mapper = createMapper();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
conf = new JobConf(jobConf);
conf.setClassLoader(jobConf.getClassLoader());
- if (!jobConf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
- }
reporter = createReporter();
}
@@ -143,6 +135,19 @@
}
this.writer = writer;
}
+
+ protected void initializeMapper() throws HyracksDataException {
+ super.initializeMapper();
+ try {
+ mapper = createMapper(conf);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ if (!conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+ }
+ }
+
}
private class ReaderMapperOperator extends MapperBaseOperator {
@@ -167,10 +172,10 @@
} else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
.toString());
- conf.setLong("map.input.start",
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
- conf.setLong("map.input.length",
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
+ conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+ .getStart());
+ conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
+ .getLength());
}
} catch (Exception e) {
e.printStackTrace();
@@ -180,10 +185,22 @@
}
}
+ protected void initializeMapper() throws HyracksDataException {
+ super.initializeMapper();
+ updateConfWithSplit(conf);
+ try {
+ mapper = createMapper(conf);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ if (!conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
+ }
+ }
+
public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
try {
initializeMapper();
- updateConfWithSplit(conf);
conf.setClassLoader(this.getClass().getClassLoader());
Object reader;
Object key = null;
@@ -209,7 +226,7 @@
};;;
OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
- .getOutputCommitter(new TaskAttemptContext(jobConf, new TaskAttemptID()));
+ .getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
StatusReporter statusReporter = new StatusReporter() {
@Override
public void setStatus(String arg0) {
@@ -229,7 +246,7 @@
return null;
}
};;;
- context = new org.apache.hadoop.mapreduce.Mapper().new Context(jobConf, new TaskAttemptID(),
+ context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
newReader, recordWriter, outputCommitter, statusReporter,
(org.apache.hadoop.mapreduce.InputSplit) inputSplit);
newReader.initialize((org.apache.hadoop.mapreduce.InputSplit) inputSplit, context);
@@ -298,9 +315,9 @@
String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
try {
if (hadoopClassFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
- (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+ .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
+ .forName(mapOutputValueClassName));
} else {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -312,21 +329,21 @@
return recordDescriptor;
}
- private Object createMapper() throws Exception {
+ private Object createMapper(JobConf conf) throws Exception {
Object mapper;
if (mapperClass != null) {
- return ReflectionUtils.newInstance(mapperClass, jobConf);
+ return ReflectionUtils.newInstance(mapperClass, conf);
} else {
String mapperClassName = null;
if (jobConf.getUseNewMapper()) {
- JobContext jobContext = new JobContext(jobConf, null);
+ JobContext jobContext = new JobContext(conf, null);
mapperClass = jobContext.getMapperClass();
mapperClassName = mapperClass.getName();
} else {
- mapperClass = super.getJobConf().getMapperClass();
+ mapperClass = conf.getMapperClass();
mapperClassName = mapperClass.getName();
}
- mapper = getHadoopClassFactory().createMapper(mapperClassName, jobConf);
+ mapper = getHadoopClassFactory().createMapper(mapperClassName, conf);
}
return mapper;
}
@@ -344,8 +361,8 @@
} else {
Class inputFormatClass = conf.getInputFormat().getClass();
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
- super.createReporter());
+ return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
+ .createReporter());
}
}
@@ -383,8 +400,8 @@
}
return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
} else {
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
- recordDescProvider.getInputRecordDescriptor(this.odId, 0));
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
+ .getInputRecordDescriptor(this.odId, 0));
}
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 2d44c79..5857f36 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -23,13 +23,14 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@@ -310,11 +311,13 @@
private static final long serialVersionUID = 1L;
private Class reducerClass;
private IComparatorFactory comparatorFactory;
+ private boolean useAsCombiner = false;
public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
- IHadoopClassFactory classFactory) {
+ IHadoopClassFactory classFactory, boolean useAsCombiner) {
super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
this.comparatorFactory = comparatorFactory;
+ this.useAsCombiner = useAsCombiner;
}
private Object createReducer() throws Exception {
@@ -322,12 +325,22 @@
return ReflectionUtils.newInstance(reducerClass, getJobConf());
} else {
Object reducer;
- if (getJobConf().getUseNewReducer()) {
- JobContext jobContext = new JobContext(getJobConf(), null);
- reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
- .getReducerClass();
+ if (!useAsCombiner) {
+ if (getJobConf().getUseNewReducer()) {
+ JobContext jobContext = new JobContext(getJobConf(), null);
+ reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+ .getReducerClass();
+ } else {
+ reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+ }
} else {
- reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
+ if (getJobConf().getUseNewReducer()) {
+ JobContext jobContext = new JobContext(getJobConf(), null);
+ reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+ .getCombinerClass();
+ } else {
+ reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
+ }
}
reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
return reducer;
@@ -382,9 +395,8 @@
RecordDescriptor recordDescriptor = null;
try {
if (classFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(outputKeyClassName),
- (Class<? extends Writable>) Class.forName(outputValueClassName));
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
+ .forName(outputKeyClassName), (Class<? extends Writable>) Class.forName(outputValueClassName));
} else {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) classFactory.loadClass(outputKeyClassName),
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 4dcfa61..2154f5a 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -21,14 +21,13 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
@@ -49,63 +48,81 @@
JobConf conf;
Path finalOutputFile;
Path tempOutputFile;
+ Path tempDir;
- HadoopFileWriter(Object recordWriter, Path tempOutputFile, Path outputFile, JobConf conf) {
+ HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
this.recordWriter = recordWriter;
this.conf = conf;
- this.finalOutputFile = outputFile;
- this.tempOutputFile = tempOutputFile;
+ initialize(index, conf);
+ }
+
+ private void initialize(int index, JobConf conf) throws Exception {
+ if (!(conf.getOutputFormat() instanceof NullOutputFormat)) {
+ boolean isMap = conf.getNumReduceTasks() == 0;
+ TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index);
+ conf.set("mapred.task.id", taskAttempId.toString());
+ String suffix = new String("part-00000");
+ suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
+ suffix = suffix + index;
+ outputPath = new Path(conf.get("mapred.output.dir"));
+ tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tempDir.getFileSystem(conf);
+ if (!fileSys.mkdirs(tempDir)) {
+ throw new IOException("Mkdirs failed to create " + tempDir.toString());
+ }
+ tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString()));
+ tempOutputFile = new Path(tempOutputFile, suffix);
+ finalOutputFile = new Path(outputPath, suffix);
+ if (conf.getUseNewMapper()) {
+ org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(
+ conf, null);
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
+ .newInstance(jobContext.getOutputFormatClass(), conf);
+ recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+ } else {
+ recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix,
+ new Progressable() {
+ @Override
+ public void progress() {
+ }
+ });
+ }
+ }
}
@Override
public void write(Object[] record) throws Exception {
- if (conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
- } else {
- ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
+ if (recordWriter != null) {
+ if (conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
+ } else {
+ ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
+ }
}
}
@Override
public void close() {
try {
- if (conf.getUseNewMapper()) {
- ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
- new TaskAttemptID()));
- } else {
- ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
+ if (recordWriter != null) {
+ if (conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
+ new TaskAttemptID()));
+ } else {
+ ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
+ }
+ if (outputPath != null) {
+ FileSystem fileSystem = FileSystem.get(conf);
+ fileSystem.rename(tempOutputFile, finalOutputFile);
+ fileSystem.delete(tempDir, true);
+ }
}
- FileSystem.get(conf).rename(tempOutputFile, finalOutputFile);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- private static class HadoopSequenceWriter implements IRecordWriter {
- private Writer writer;
-
- HadoopSequenceWriter(Writer writer) throws Exception {
- this.writer = writer;
- }
-
- @Override
- public void close() {
- try {
- writer.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void write(Object[] record) throws Exception {
- Object key = record[0];
- Object value = record[1];
- writer.append(key, value);
- }
- }
-
private static final long serialVersionUID = 1L;
Map<String, String> jobConfMap;
@@ -114,46 +131,9 @@
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
conf.setClassLoader(this.getClass().getClassLoader());
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- FileSystem fileSystem = null;
- try {
- fileSystem = FileSystem.get(conf);
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
- Path path = new Path(fileSplit.getLocalFile().getFile().getPath());
- Path tempOutputFile = null;
- Path finalOutputFile = null;
- checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
+ FileSystem fileSystem = FileSystem.get(conf);
Object recordWriter = null;
- Object outputFormat = null;
- String taskAttempId = new TaskAttemptID().toString();
- conf.set("mapred.task.id", taskAttempId);
- outputPath = new Path(conf.get("mapred.output.dir"));
- outputTempPath = new Path(outputPath, "_temporary");
- if (outputPath != null && !fileSystem.exists(outputPath)) {
- fileSystem.mkdirs(outputTempPath);
- }
- String suffix = new String("part-r-00000");
- suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
- suffix = suffix + index;
- tempOutputFile = new Path(outputTempPath, "_" + taskAttempId + "/" + suffix);
- if (conf.getNumReduceTasks() == 0) {
- suffix.replace("-r-", "-m-");
- }
- finalOutputFile = new Path(outputPath, suffix);
- if (conf.getUseNewMapper()) {
- org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
- .newInstance((new JobContext(conf, null)).getOutputFormatClass(), conf);
- recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
- } else {
- recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf, suffix, new Progressable() {
- @Override
- public void progress() {
- }
- });
- }
-
- return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
+ return new HadoopFileWriter(recordWriter, index, conf);
}
Path outputPath;
@@ -219,7 +199,8 @@
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
Object outputFormat = null;
if (conf.getUseNewMapper()) {
- outputFormat = ReflectionUtils.newInstance(new JobContext(conf, null).getOutputFormatClass(), conf);
+ outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf, null)
+ .getOutputFormatClass(), conf);
} else {
outputFormat = conf.getOutputFormat();
}
@@ -234,18 +215,15 @@
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
- System.out.println("absolute path:" + absolutePath);
for (int index = 0; index < numOutputters; index++) {
String suffix = new String("part-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
String outputPath = absolutePath + "/" + suffix;
- System.out.println("output path :" + outputPath);
outputFileSplits[index] = new FileSplit("localhost", outputPath);
}
return outputFileSplits;
}
-
}
public HadoopWriteOperatorDescriptor(JobSpecification jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
index f4dbe4a..a9f3a71 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -17,19 +17,18 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
-
public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
@Override
- public Object createMapper(String mapClassName,JobConf conf) throws Exception {
+ public Object createMapper(String mapClassName, JobConf conf) throws Exception {
Class clazz = loadClass(mapClassName);
return ReflectionUtils.newInstance(clazz, conf);
}
@Override
- public Object createReducer(String reduceClassName,JobConf conf) throws Exception {
+ public Object createReducer(String reduceClassName, JobConf conf) throws Exception {
Class clazz = loadClass(reduceClassName);
- return ReflectionUtils.newInstance(clazz, conf);
+ return ReflectionUtils.newInstance(clazz, conf);
}
@Override
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
index 22db448..c2ffeeb 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -20,9 +20,9 @@
public interface IHadoopClassFactory extends Serializable {
- public Object createMapper(String mapClassName,JobConf conf) throws Exception;
+ public Object createMapper(String mapClassName, JobConf conf) throws Exception;
- public Object createReducer(String reduceClassName,JobConf conf) throws Exception;
+ public Object createReducer(String reduceClassName, JobConf conf) throws Exception;
public Class loadClass(String className) throws Exception;
}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index 29239e3..9226c29 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -36,15 +36,15 @@
isClasses = new Class[inputSplits.length];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
- if(conf.getUseNewMapper()){
+ if (conf.getUseNewMapper()) {
for (int i = 0; i < inputSplits.length; ++i) {
- isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit)inputSplits[i]).getClass();
- ((Writable)inputSplits[i]).write(dos);
+ isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit) inputSplits[i]).getClass();
+ ((Writable) inputSplits[i]).write(dos);
}
} else {
for (int i = 0; i < inputSplits.length; ++i) {
- isClasses[i] = ((org.apache.hadoop.mapred.InputSplit)inputSplits[i]).getClass();
- ((Writable)inputSplits[i]).write(dos);
+ isClasses[i] = ((org.apache.hadoop.mapred.InputSplit) inputSplits[i]).getClass();
+ ((Writable) inputSplits[i]).write(dos);
}
}
dos.close();
@@ -52,17 +52,16 @@
}
- public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
- IOException {
+ public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException, IOException {
Object[] splits = new Object[isClasses.length];
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
for (int i = 0; i < splits.length; ++i) {
splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
- if(jobConf.getUseNewMapper()){
- ((Writable)splits[i]).readFields(dis);
- }else {
- ((Writable)splits[i]).readFields(dis);
- }
+ if (jobConf.getUseNewMapper()) {
+ ((Writable) splits[i]).readFields(dis);
+ } else {
+ ((Writable) splits[i]).readFields(dis);
+ }
}
return splits;
}