1) made changes to pom.xml for hadoopcompatapp , so that working directory is appropriately set for the CC and NCs 2) refactored code
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_hadoop_compat_changes@479 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/.settings/org.eclipse.jdt.core.prefs b/hyracks-api/.settings/org.eclipse.jdt.core.prefs
index a0e106b..6b7a0fc 100644
--- a/hyracks-api/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-api/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Thu Jul 29 01:10:06 PDT 2010
+#Thu Jun 02 12:55:19 PDT 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-cli/.classpath b/hyracks-cli/.classpath
index ba0bb5a..1f3c1ff 100644
--- a/hyracks-cli/.classpath
+++ b/hyracks-cli/.classpath
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry kind="src" path="target/generated-sources/javacc"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
index 7404d54..0272d6e 100644
--- a/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-dataflow-hadoop/.settings/org.eclipse.jdt.core.prefs
@@ -1,5 +1,15 @@
-#Tue Nov 02 17:09:03 PDT 2010
+#Thu Jun 02 13:11:16 PDT 2011
eclipse.preferences.version=1
+org.eclipse.jdt.core.codeComplete.argumentPrefixes=
+org.eclipse.jdt.core.codeComplete.argumentSuffixes=
+org.eclipse.jdt.core.codeComplete.fieldPrefixes=
+org.eclipse.jdt.core.codeComplete.fieldSuffixes=
+org.eclipse.jdt.core.codeComplete.localPrefixes=
+org.eclipse.jdt.core.codeComplete.localSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFieldSuffixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldPrefixes=
+org.eclipse.jdt.core.codeComplete.staticFinalFieldSuffixes=
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
@@ -14,7 +24,7 @@
org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=49
+org.eclipse.jdt.core.formatter.alignment_for_enum_constants=48
org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
@@ -48,18 +58,18 @@
org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=true
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
+org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
+org.eclipse.jdt.core.formatter.comment.format_block_comments=true
org.eclipse.jdt.core.formatter.comment.format_header=false
org.eclipse.jdt.core.formatter.comment.format_html=true
org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=true
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
+org.eclipse.jdt.core.formatter.comment.format_line_comments=true
org.eclipse.jdt.core.formatter.comment.format_source_code=true
org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=9999
+org.eclipse.jdt.core.formatter.comment.line_length=80
org.eclipse.jdt.core.formatter.compact_else_if=true
org.eclipse.jdt.core.formatter.continuation_indentation=2
org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
@@ -246,7 +256,7 @@
org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=false
+org.eclipse.jdt.core.formatter.join_lines_in_comments=true
org.eclipse.jdt.core.formatter.join_wrapped_lines=true
org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
@@ -257,7 +267,7 @@
org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=false
+org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
org.eclipse.jdt.core.formatter.tabulation.char=space
org.eclipse.jdt.core.formatter.tabulation.size=4
org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index a1c14c5..fd549b3 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -135,7 +135,7 @@
}
this.writer = writer;
}
-
+
protected void initializeMapper() throws HyracksDataException {
super.initializeMapper();
try {
@@ -146,8 +146,7 @@
if (!conf.getUseNewMapper()) {
((org.apache.hadoop.mapred.Mapper) mapper).configure(conf);
}
- }
-
+ }
}
@@ -344,7 +343,7 @@
mapperClass = conf.getMapperClass();
mapperClassName = mapperClass.getName();
}
- mapper = getHadoopClassFactory().createMapper(mapperClassName,conf);
+ mapper = getHadoopClassFactory().createMapper(mapperClassName, conf);
}
return mapper;
}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 195b7c4..5857f36 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -60,64 +60,66 @@
private Object reducer;
private DataWritingOutputCollector<K3, V3> output;
private Reporter reporter;
- private ReducerContext reducerContext;
+ private ReducerContext reducerContext;
RawKeyValueIterator rawKeyValueIterator = new RawKeyValueIterator() {
-
+
@Override
public boolean next() throws IOException {
return false;
}
-
+
@Override
public DataInputBuffer getValue() throws IOException {
return null;
}
-
+
@Override
public Progress getProgress() {
return null;
}
-
+
@Override
public DataInputBuffer getKey() throws IOException {
return null;
}
-
+
@Override
public void close() throws IOException {
-
+
}
};
-
-
+
class ReducerContext extends org.apache.hadoop.mapreduce.Reducer.Context {
private HadoopReducerOperatorDescriptor.ValueIterator iterator;
-
+
@SuppressWarnings("unchecked")
- ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException, InterruptedException, ClassNotFoundException{
-
- reducer.super(conf,new TaskAttemptID(),rawKeyValueIterator,null,null,null,null,null,null,Class.forName("org.apache.hadoop.io.NullWritable"),Class.forName("org.apache.hadoop.io.NullWritable"));
+ ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ reducer.super(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null, null, null, Class
+ .forName("org.apache.hadoop.io.NullWritable"), Class
+ .forName("org.apache.hadoop.io.NullWritable"));
}
-
- public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
+
+ public void setIterator(HadoopReducerOperatorDescriptor.ValueIterator iter) {
iterator = iter;
}
-
+
@Override
public Iterable<V2> getValues() throws IOException, InterruptedException {
- return new Iterable<V2>() {
- @Override
- public Iterator<V2> iterator() {
- return iterator;
- }
- };
+ return new Iterable<V2>() {
+ @Override
+ public Iterator<V2> iterator() {
+ return iterator;
+ }
+ };
}
-
+
/** Start processing next unique key. */
@Override
- public boolean nextKey() throws IOException,InterruptedException {
+ public boolean nextKey() throws IOException, InterruptedException {
boolean hasMore = iterator.hasNext();
- if(hasMore){
+ if (hasMore) {
nextKeyValue();
}
return hasMore;
@@ -133,26 +135,25 @@
}
public Object getCurrentKey() {
- return iterator.getKey();
+ return iterator.getKey();
}
@Override
public Object getCurrentValue() {
- return iterator.getValue();
+ return iterator.getValue();
}
-
+
/**
* Generate an output key/value pair.
*/
@Override
- public void write(Object key, Object value
- ) throws IOException, InterruptedException {
- output.collect(key, value);
+ public void write(Object key, Object value) throws IOException, InterruptedException {
+ output.collect(key, value);
}
}
-
- public ReducerAggregator(Object reducer) throws HyracksDataException{
+
+ public ReducerAggregator(Object reducer) throws HyracksDataException {
this.reducer = reducer;
initializeReducer();
output = new DataWritingOutputCollector<K3, V3>();
@@ -201,17 +202,17 @@
i.reset(reader);
output.setWriter(writer);
try {
- if(jobConf.getUseNewReducer()){
+ if (jobConf.getUseNewReducer()) {
try {
reducerContext.setIterator(i);
- ((org.apache.hadoop.mapreduce.Reducer)reducer).run(reducerContext);
+ ((org.apache.hadoop.mapreduce.Reducer) reducer).run(reducerContext);
} catch (InterruptedException e) {
e.printStackTrace();
throw new HyracksDataException(e);
}
- } else {
- ((org.apache.hadoop.mapred.Reducer)reducer).reduce(i.getKey(), i, output, reporter);
- }
+ } else {
+ ((org.apache.hadoop.mapred.Reducer) reducer).reduce(i.getKey(), i, output, reporter);
+ }
} catch (IOException e) {
e.printStackTrace();
}
@@ -221,28 +222,28 @@
public void close() throws HyracksDataException {
// -- - close - --
try {
- if(!jobConf.getUseNewMapper()) {
- ((org.apache.hadoop.mapred.Reducer)reducer).close();
- }
+ if (!jobConf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapred.Reducer) reducer).close();
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
-
+
private void initializeReducer() throws HyracksDataException {
jobConf.setClassLoader(this.getClass().getClassLoader());
- if(!jobConf.getUseNewReducer()) {
- ((org.apache.hadoop.mapred.Reducer)reducer).configure(getJobConf());
+ if (!jobConf.getUseNewReducer()) {
+ ((org.apache.hadoop.mapred.Reducer) reducer).configure(getJobConf());
} else {
try {
- reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer)reducer,jobConf);
+ reducerContext = new ReducerContext((org.apache.hadoop.mapreduce.Reducer) reducer, jobConf);
} catch (IOException e) {
e.printStackTrace();
throw new HyracksDataException(e);
} catch (InterruptedException e) {
e.printStackTrace();
throw new HyracksDataException(e);
- } catch (RuntimeException e){
+ } catch (RuntimeException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
@@ -259,7 +260,7 @@
public K2 getKey() {
return key;
}
-
+
public V2 getValue() {
return value;
}
@@ -313,7 +314,7 @@
private boolean useAsCombiner = false;
public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
- IHadoopClassFactory classFactory,boolean useAsCombiner) {
+ IHadoopClassFactory classFactory, boolean useAsCombiner) {
super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
this.comparatorFactory = comparatorFactory;
this.useAsCombiner = useAsCombiner;
@@ -324,22 +325,24 @@
return ReflectionUtils.newInstance(reducerClass, getJobConf());
} else {
Object reducer;
- if(!useAsCombiner) {
- if(getJobConf().getUseNewReducer()){
+ if (!useAsCombiner) {
+ if (getJobConf().getUseNewReducer()) {
JobContext jobContext = new JobContext(getJobConf(), null);
- reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getReducerClass();
+ reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+ .getReducerClass();
} else {
reducerClass = (Class<? extends Reducer>) getJobConf().getReducerClass();
}
} else {
- if(getJobConf().getUseNewReducer()){
+ if (getJobConf().getUseNewReducer()) {
JobContext jobContext = new JobContext(getJobConf(), null);
- reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?,?,?,?>> )jobContext.getCombinerClass();
+ reducerClass = (Class<? extends org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?>>) jobContext
+ .getCombinerClass();
} else {
reducerClass = (Class<? extends Reducer>) getJobConf().getCombinerClass();
}
}
- reducer = getHadoopClassFactory().createReducer(reducerClass.getName(),getJobConf());
+ reducer = getHadoopClassFactory().createReducer(reducerClass.getName(), getJobConf());
return reducer;
}
}
@@ -377,18 +380,18 @@
}
public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
- String outputKeyClassName =null;
+ String outputKeyClassName = null;
String outputValueClassName = null;
-
- if(conf.getUseNewMapper()) {
- JobContext context = new JobContext(conf,null);
+
+ if (conf.getUseNewMapper()) {
+ JobContext context = new JobContext(conf, null);
outputKeyClassName = context.getOutputKeyClass().getName();
outputValueClassName = context.getOutputValueClass().getName();
} else {
outputKeyClassName = conf.getOutputKeyClass().getName();
outputValueClassName = conf.getOutputValueClass().getName();
}
-
+
RecordDescriptor recordDescriptor = null;
try {
if (classFactory == null) {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index 30584b7..2154f5a 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -32,7 +32,6 @@
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.FileOutputCommitter;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -43,58 +42,61 @@
public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
- private class HadoopFileWriter implements IRecordWriter {
+ private class HadoopFileWriter implements IRecordWriter {
Object recordWriter;
JobConf conf;
Path finalOutputFile;
Path tempOutputFile;
- Path tempDir;
-
+ Path tempDir;
+
HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
this.recordWriter = recordWriter;
this.conf = conf;
- initialize(index, conf);
+ initialize(index, conf);
}
-
private void initialize(int index, JobConf conf) throws Exception {
- if(! (conf.getOutputFormat() instanceof NullOutputFormat)) {
+ if (!(conf.getOutputFormat() instanceof NullOutputFormat)) {
boolean isMap = conf.getNumReduceTasks() == 0;
- TaskAttemptID taskAttempId = new TaskAttemptID("0",index,isMap,index,index);
- conf.set("mapred.task.id",taskAttempId.toString());
- String suffix = new String("part-00000");
+ TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index);
+ conf.set("mapred.task.id", taskAttempId.toString());
+ String suffix = new String("part-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
outputPath = new Path(conf.get("mapred.output.dir"));
- tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tempDir.getFileSystem(conf);
- if (!fileSys.mkdirs(tempDir)) {
- throw new IOException("Mkdirs failed to create " + tempDir.toString());
- }
- tempOutputFile = new Path(tempDir,new Path("_" + taskAttempId.toString()));
- tempOutputFile = new Path(tempOutputFile,suffix);
- finalOutputFile = new Path(outputPath,suffix);
- if(conf.getUseNewMapper()){
- org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(conf,null);
- org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance(jobContext.getOutputFormatClass(),conf);
- recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
- }else {
- recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf,suffix, new Progressable() {
- @Override
- public void progress() {}
- });
+ tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tempDir.getFileSystem(conf);
+ if (!fileSys.mkdirs(tempDir)) {
+ throw new IOException("Mkdirs failed to create " + tempDir.toString());
}
- }
- }
+ tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString()));
+ tempOutputFile = new Path(tempOutputFile, suffix);
+ finalOutputFile = new Path(outputPath, suffix);
+ if (conf.getUseNewMapper()) {
+ org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(
+ conf, null);
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
+ .newInstance(jobContext.getOutputFormatClass(), conf);
+ recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
+ } else {
+ recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix,
+ new Progressable() {
+ @Override
+ public void progress() {
+ }
+ });
+ }
+ }
+ }
@Override
public void write(Object[] record) throws Exception {
- if(recordWriter != null){
- if (conf.getUseNewMapper()){
- ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
+ if (recordWriter != null) {
+ if (conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
} else {
- ((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
+ ((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
}
}
}
@@ -102,18 +104,19 @@
@Override
public void close() {
try {
- if(recordWriter != null) {
- if (conf.getUseNewMapper()){
- ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
+ if (recordWriter != null) {
+ if (conf.getUseNewMapper()) {
+ ((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
+ new TaskAttemptID()));
} else {
- ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
+ ((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
}
- if (outputPath != null) {
- FileSystem fileSystem = FileSystem.get(conf);
+ if (outputPath != null) {
+ FileSystem fileSystem = FileSystem.get(conf);
fileSystem.rename(tempOutputFile, finalOutputFile);
- fileSystem.delete(tempDir,true);
- }
- }
+ fileSystem.delete(tempDir, true);
+ }
+ }
} catch (Exception e) {
e.printStackTrace();
}
@@ -129,53 +132,51 @@
conf.setClassLoader(this.getClass().getClassLoader());
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
FileSystem fileSystem = FileSystem.get(conf);
- checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
- Object recordWriter = null;
+ Object recordWriter = null;
return new HadoopFileWriter(recordWriter, index, conf);
}
-
Path outputPath;
Path outputTempPath;
-
+
protected Reporter createReporter() {
- return new Reporter() {
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
+ return new Reporter() {
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
- @Override
- public void incrCounter(Enum<?> key, long amount) {
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
- }
+ }
- @Override
- public void incrCounter(String group, String counter, long amount) {
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
- }
+ }
- @Override
- public void progress() {
+ @Override
+ public void progress() {
- }
+ }
- @Override
- public void setStatus(String status) {
+ @Override
+ public void setStatus(String status) {
- }
- };
-}
+ }
+ };
+ }
private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
@@ -197,8 +198,9 @@
private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
Object outputFormat = null;
- if(conf.getUseNewMapper()) {
- outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf,null).getOutputFormatClass(), conf);
+ if (conf.getUseNewMapper()) {
+ outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf, null)
+ .getOutputFormatClass(), conf);
} else {
outputFormat = conf.getOutputFormat();
}
@@ -222,23 +224,11 @@
}
return outputFileSplits;
}
-
}
public HadoopWriteOperatorDescriptor(JobSpecification jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
super(jobSpec, getOutputSplits(jobConf, numMapTasks));
this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
checkIfCanWriteToHDFS(super.splits);
- /*
- FileSystem fs = FileSystem.get(jobConf);
- if (jobConf.get("mapred.output.dir") != null) {
- Path output = new Path(jobConf.get("mapred.output.dir"));
- Path outputTemp = new Path(output,"_temporary");
- if(output != null && !fs.exists(outputTemp)) {
- fs.mkdirs(outputTemp);
- }
- }
- }
- */
}
}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
index f4dbe4a..a9f3a71 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/ClasspathBasedHadoopClassFactory.java
@@ -17,19 +17,18 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
-
public class ClasspathBasedHadoopClassFactory implements IHadoopClassFactory {
@Override
- public Object createMapper(String mapClassName,JobConf conf) throws Exception {
+ public Object createMapper(String mapClassName, JobConf conf) throws Exception {
Class clazz = loadClass(mapClassName);
return ReflectionUtils.newInstance(clazz, conf);
}
@Override
- public Object createReducer(String reduceClassName,JobConf conf) throws Exception {
+ public Object createReducer(String reduceClassName, JobConf conf) throws Exception {
Class clazz = loadClass(reduceClassName);
- return ReflectionUtils.newInstance(clazz, conf);
+ return ReflectionUtils.newInstance(clazz, conf);
}
@Override
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
index 22db448..c2ffeeb 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/IHadoopClassFactory.java
@@ -20,9 +20,9 @@
public interface IHadoopClassFactory extends Serializable {
- public Object createMapper(String mapClassName,JobConf conf) throws Exception;
+ public Object createMapper(String mapClassName, JobConf conf) throws Exception;
- public Object createReducer(String reduceClassName,JobConf conf) throws Exception;
+ public Object createReducer(String reduceClassName, JobConf conf) throws Exception;
public Class loadClass(String className) throws Exception;
}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index 29239e3..9226c29 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -36,15 +36,15 @@
isClasses = new Class[inputSplits.length];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
- if(conf.getUseNewMapper()){
+ if (conf.getUseNewMapper()) {
for (int i = 0; i < inputSplits.length; ++i) {
- isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit)inputSplits[i]).getClass();
- ((Writable)inputSplits[i]).write(dos);
+ isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit) inputSplits[i]).getClass();
+ ((Writable) inputSplits[i]).write(dos);
}
} else {
for (int i = 0; i < inputSplits.length; ++i) {
- isClasses[i] = ((org.apache.hadoop.mapred.InputSplit)inputSplits[i]).getClass();
- ((Writable)inputSplits[i]).write(dos);
+ isClasses[i] = ((org.apache.hadoop.mapred.InputSplit) inputSplits[i]).getClass();
+ ((Writable) inputSplits[i]).write(dos);
}
}
dos.close();
@@ -52,17 +52,16 @@
}
- public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
- IOException {
+ public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException, IOException {
Object[] splits = new Object[isClasses.length];
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
for (int i = 0; i < splits.length; ++i) {
splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
- if(jobConf.getUseNewMapper()){
- ((Writable)splits[i]).readFields(dis);
- }else {
- ((Writable)splits[i]).readFields(dis);
- }
+ if (jobConf.getUseNewMapper()) {
+ ((Writable) splits[i]).readFields(dis);
+ } else {
+ ((Writable) splits[i]).readFields(dis);
+ }
}
return splits;
}
diff --git a/hyracks-examples/pom.xml b/hyracks-examples/pom.xml
index 4d42049..5281140 100644
--- a/hyracks-examples/pom.xml
+++ b/hyracks-examples/pom.xml
@@ -16,5 +16,6 @@
<module>text-example</module>
<module>btree-example</module>
<module>hyracks-integration-tests</module>
+ <module>hadoop-compat-example</module>
</modules>
</project>
diff --git a/hyracks-examples/text-example/textapp/.classpath b/hyracks-examples/text-example/textapp/.classpath
index 0dff09c..f2cc5f7 100644
--- a/hyracks-examples/text-example/textapp/.classpath
+++ b/hyracks-examples/text-example/textapp/.classpath
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
- <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>