Hadoop Operators currently do not support 'org.apache.hadoop.mapreduce.*' types and hence cannot run MR jobs referencing those types. In order to be compatible, we need to support them.
> This change adds suport for mapreduce libraries. The changes are spread across all Hadoop operators. The compatibilty layer also changes in order to support the mapreduce package
git-svn-id: https://hyracks.googlecode.com/svn/trunk@184 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index bbe8689..08df437 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -15,24 +15,23 @@
package edu.uci.ics.hyracks.dataflow.hadoop;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -41,9 +40,7 @@
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
@@ -147,11 +144,15 @@
if (inputSplits == null) {
inputSplits = inputSplitsProxy.toInputSplits(conf);
}
- InputSplit splitRead = inputSplits[partition];
+ Object splitRead = inputSplits[partition];
if (splitRead instanceof FileSplit) {
conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
conf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
conf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
+ } else if ( splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+ conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath().toString());
+ conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
+ conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
}
} catch (Exception e) {
e.printStackTrace();
@@ -161,44 +162,52 @@
}
}
- public void mapInput() throws HyracksDataException {
+ public void mapInput() throws HyracksDataException, InterruptedException, ClassNotFoundException {
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
initializeMapper();
updateConfWithSplit(conf);
mapper.configure(conf);
conf.setClassLoader(this.getClass().getClassLoader());
- RecordReader hadoopRecordReader;
- Object key;
- Object value;
- InputSplit inputSplit = inputSplits[partition];
- Class inputFormatClass = conf.getInputFormat().getClass();
- InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
- Class inputKeyClass;
- Class inputValueClass;
- if (hadoopRecordReader instanceof SequenceFileRecordReader) {
- inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
- inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
- } else {
- inputKeyClass = hadoopRecordReader.createKey().getClass();
- inputValueClass = hadoopRecordReader.createValue().getClass();
- }
-
- key = hadoopRecordReader.createKey();
- value = hadoopRecordReader.createValue();
- RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
- (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
- int nFields = outputRecordDescriptor.getFields().length;
- ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
+ Object reader;
+ Object key = null;
+ Object value = null;
+ Object inputSplit = inputSplits[partition];
+ reader = getRecordReader(conf, inputSplit);
+ Class inputKeyClass = null;
+ Class inputValueClass = null;
Object[] data = new Object[2];
- while (hadoopRecordReader.next(key, value)) {
- data[0] = key;
- data[1] = value;
- super.map(data);
+
+ if(conf.getUseNewMapper()){
+ org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader)reader;
+ newReader.initialize((org.apache.hadoop.mapreduce.InputSplit)inputSplit, new TaskAttemptContext(conf, new TaskAttemptID()));
+ while(newReader.nextKeyValue()){
+ data[0] = newReader.getCurrentKey();
+ if(data[0] == null) {
+ data[0] = NullWritable.get();
+ }
+ data[1] = newReader.getCurrentValue();
+ super.map(data);
+ }
+ } else {
+ RecordReader oldReader = (RecordReader)reader;
+ if (reader instanceof SequenceFileRecordReader) {
+ inputKeyClass = ((SequenceFileRecordReader) oldReader).getKeyClass();
+ inputValueClass = ((SequenceFileRecordReader) oldReader).getValueClass();
+ } else {
+ inputKeyClass = oldReader.createKey().getClass();
+ inputValueClass = oldReader.createValue().getClass();
+ }
+ key = oldReader.createKey();
+ value = oldReader.createValue();
+ while (oldReader.next(key, value)) {
+ data[0] = key;
+ data[1] = value;
+ super.map(data);
+ }
+ oldReader.close();
}
- hadoopRecordReader.close();
+
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -214,13 +223,13 @@
private static final long serialVersionUID = 1L;
private Class<? extends Mapper> mapperClass;
private InputSplitsProxy inputSplitsProxy;
- private transient InputSplit[] inputSplits;
+ private transient Object[] inputSplits;
private boolean selfRead = false;
- private void initializeSplitInfo(InputSplit[] splits) throws IOException {
+ private void initializeSplitInfo(Object[] splits) throws IOException {
jobConf = super.getJobConf();
InputFormat inputFormat = jobConf.getInputFormat();
- inputSplitsProxy = new InputSplitsProxy(splits);
+ inputSplitsProxy = new InputSplitsProxy(jobConf,splits);
}
public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory)
@@ -228,7 +237,7 @@
super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
}
- public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, InputSplit[] splits,
+ public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, Object[] splits,
IHadoopClassFactory hadoopClassFactory) throws IOException {
super(spec, 0, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
initializeSplitInfo(splits);
@@ -266,6 +275,19 @@
}
}
+ private Object getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException, InterruptedException {
+ if(conf.getUseNewMapper()){
+ JobContext context = new JobContext(conf,null);
+ org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(context.getInputFormatClass(), conf);
+ TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(jobConf,new TaskAttemptID());
+ return inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit)inputSplit,taskAttemptContext);
+ } else {
+ Class inputFormatClass = conf.getInputFormat().getClass();
+ InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+ return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit)inputSplit, conf, super.createReporter());
+ }
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
@@ -278,10 +300,23 @@
if (inputSplits == null) {
inputSplits = inputSplitsProxy.toInputSplits(conf);
}
- RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[partition], conf,
- super.createReporter());
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) reader
- .createKey().getClass(), (Class<? extends Writable>) reader.createValue().getClass());
+ Object reader = getRecordReader(conf,inputSplits[partition]);
+ if(conf.getUseNewMapper()) {
+ org.apache.hadoop.mapreduce.RecordReader newReader = (org.apache.hadoop.mapreduce.RecordReader)reader;
+ newReader.initialize((org.apache.hadoop.mapreduce.InputSplit)inputSplits[partition], new TaskAttemptContext(conf, new TaskAttemptID()));
+ newReader.nextKeyValue();
+ Object key = newReader.getCurrentKey();
+ Class keyClass = null;
+ if (key == null) {
+ keyClass = Class.forName("org.apache.hadoop.io.NullWritable");
+ }
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) keyClass,
+ (Class<? extends Writable>) newReader.getCurrentValue().getClass());
+ } else {
+ RecordReader oldReader = (RecordReader)reader;
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) oldReader
+ .createKey().getClass(), (Class<? extends Writable>) oldReader.createValue().getClass());
+ }
return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
} else {
return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
@@ -299,7 +334,11 @@
public void initialize() throws HyracksDataException {
SerializingDataWriter writer = new SerializingDataWriter(ctx, recordDescriptor, this.writer);
ReaderMapperOperator readMapOp = new ReaderMapperOperator(partition, writer);
- readMapOp.mapInput();
+ try {
+ readMapOp.mapInput();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
readMapOp.close();
}
};
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 428bafe..93ef1ec 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
@@ -60,35 +62,40 @@
return jobConf;
}
- public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, InputSplit[] splits) throws IOException {
+ public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec, Object[] splits) throws IOException {
super(spec, 0, 1);
this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
InputFormat inputFormat = jobConf.getInputFormat();
- RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, createReporter());
+ RecordReader recordReader;
+ try {
+ recordReader = getRecordReader(DatatypeHelper.map2JobConf(jobConfMap), splits[0]);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
.createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
this.setPartitionConstraint(new PartitionCountConstraint(splits.length));
- inputSplitsProxy = new InputSplitsProxy(splits);
+ inputSplitsProxy = new InputSplitsProxy(jobConf,splits);
this.inputFormatClassName = inputFormat.getClass().getName();
- try {
- checkSplits(jobConf);
- } catch (Exception e) {
- e.printStackTrace();
- }
}
- public InputSplit[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException {
- return inputSplitsProxy.toInputSplits(getJobConf());
- }
-
- private void checkSplits(JobConf conf) throws Exception {
- InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
- for (InputSplit inputSplit : splits) {
- Class inputFormatClass = Class.forName(inputFormatClassName);
+ private RecordReader getRecordReader(JobConf conf, Object inputSplit) throws ClassNotFoundException, IOException, InterruptedException {
+ RecordReader hadoopRecordReader = null;
+ if(conf.getUseNewMapper()){
+ JobContext context = new JobContext(conf,null);
+ org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(context.getInputFormatClass(), conf);
+ TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(jobConf, null);
+ hadoopRecordReader = (RecordReader) inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit)inputSplit,taskAttemptContext);
+ } else {
+ Class inputFormatClass = conf.getInputFormat().getClass();
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- RecordReader hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf,
- createReporter());
+ hadoopRecordReader = (RecordReader)inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit)inputSplit, conf, createReporter());
}
+ return hadoopRecordReader;
+ }
+
+ public Object[] getInputSplits() throws InstantiationException, IllegalAccessException, IOException {
+ return inputSplitsProxy.toInputSplits(getJobConf());
}
protected Reporter createReporter() {
@@ -145,11 +152,20 @@
RecordReader hadoopRecordReader;
Object key;
Object value;
- InputSplit[] splits = inputSplitsProxy.toInputSplits(conf);
- InputSplit inputSplit = splits[partition];
- Class inputFormatClass = Class.forName(inputFormatClassName);
- InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
+ Object[] splits = inputSplitsProxy.toInputSplits(conf);
+ Object inputSplit = splits[partition];
+
+ if(conf.getUseNewMapper()){
+ JobContext context = new JobContext(conf,null);
+ org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(context.getInputFormatClass(), conf);
+ TaskAttemptContext taskAttemptContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(jobConf, null);
+ hadoopRecordReader = (RecordReader) inputFormat.createRecordReader((org.apache.hadoop.mapreduce.InputSplit)inputSplit,taskAttemptContext);
+ } else {
+ Class inputFormatClass = conf.getInputFormat().getClass();
+ InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+ hadoopRecordReader = (RecordReader)inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit)inputSplit, conf, createReporter());
+ }
+
Class inputKeyClass;
Class inputValueClass;
if (hadoopRecordReader instanceof SequenceFileRecordReader) {
@@ -197,6 +213,8 @@
throw new HyracksDataException(e);
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
+ } catch (InterruptedException e){
+ throw new HyracksDataException(e);
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 728929a..b7d3296 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IDataReader;
@@ -229,8 +230,18 @@
}
public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory classFactory) {
- String outputKeyClassName = conf.getOutputKeyClass().getName();
- String outputValueClassName = conf.getOutputValueClass().getName();
+ String outputKeyClassName =null;
+ String outputValueClassName = null;
+
+ if(conf.getUseNewMapper()) {
+ JobContext context = new JobContext(conf,null);
+ outputKeyClassName = context.getOutputKeyClass().getName();
+ outputValueClassName = context.getOutputValueClass().getName();
+ } else {
+ outputKeyClassName = conf.getOutputKeyClass().getName();
+ outputValueClassName = conf.getOutputValueClass().getName();
+ }
+
RecordDescriptor recordDescriptor = null;
try {
if (classFactory == null) {
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
index a2dac50..24103c6 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopWriteOperatorDescriptor.java
@@ -29,10 +29,19 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
@@ -44,42 +53,45 @@
private static String nullWritableClassName = NullWritable.class.getName();
- private static class HDFSWriter extends RecordWriter {
+ private static class HadoopFileWriter implements IRecordWriter {
- HDFSWriter(FileSystem fileSystem, String hdfsPath, int[] columns, char separator) throws Exception {
- super(columns, separator, new Object[] { fileSystem, hdfsPath });
- }
-
- @Override
- public OutputStream createOutputStream(Object[] args) throws Exception {
- FSDataOutputStream fs = ((FileSystem) args[0]).create(new Path((String) args[1]));
- return fs;
+ Object recordWriter;
+ JobConf conf;
+ final boolean useNewMapReduceLib;
+ HadoopFileWriter(Object recordWriter,JobConf conf) {
+ this.recordWriter = recordWriter;
+ this.conf = conf;
+ useNewMapReduceLib = conf.getUseNewMapper();
}
@Override
public void write(Object[] record) throws Exception {
- if (!nullWritableClassName.equals((record[0].getClass().getName()))) {
- bufferedWriter.write(String.valueOf(record[0]));
- }
- if (!nullWritableClassName.equals((record[1].getClass().getName()))) {
- bufferedWriter.write(separator);
- bufferedWriter.write(String.valueOf(record[1]));
- }
- bufferedWriter.write("\n");
- }
- }
-
- private static class HDFSSequenceWriter extends RecordWriter {
- private Writer writer;
-
- HDFSSequenceWriter(FileSystem fileSystem, String hdfsPath, Writer writer) throws Exception {
- super(null, COMMA, new Object[] { fileSystem, hdfsPath });
- this.writer = writer;
+ if (useNewMapReduceLib){
+ ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
+ } else {
+ ((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
+ }
}
@Override
- public OutputStream createOutputStream(Object[] args) throws Exception {
- return null;
+ public void close() {
+ try {
+ if (useNewMapReduceLib){
+ ((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
+ } else {
+ ((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static class HadoopSequenceWriter implements IRecordWriter {
+ private Writer writer;
+
+ HadoopSequenceWriter(Writer writer) throws Exception {
+ this.writer = writer;
}
@Override
@@ -100,45 +112,100 @@
}
private static final long serialVersionUID = 1L;
- private static final char COMMA = ',';
- private boolean sequenceFileOutput = false;
Map<String, String> jobConfMap;
@Override
- protected IRecordWriter createRecordWriter(File file, int index) throws Exception {
+ protected IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception {
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
conf.setClassLoader(this.getClass().getClassLoader());
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(conf);
} catch (IOException ioe) {
ioe.printStackTrace();
}
- Path path = new Path(file.getPath());
- checkIfCanWriteToHDFS(new FileSplit[] { new FileSplit("localhost", file) });
- FSDataOutputStream outputStream = fileSystem.create(path);
- outputStream.close();
- if (sequenceFileOutput) {
+ Path path = new Path(fileSplit.getPath());
+ checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
+ Object recordWriter = null;
+ boolean sequenceFileOutput = false;
+ Object outputFormat = null;
+ if(conf.getUseNewMapper()){
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
+ if(newOutputFormat instanceof org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat) {
+ sequenceFileOutput = true;
+ }else{
+ recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
+ }
+ }else {
+ if(conf.getOutputFormat() instanceof SequenceFileOutputFormat) {
+ sequenceFileOutput = true;
+ } else {
+ recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf, "" + System.currentTimeMillis(), new Progressable() {
+ @Override
+ public void progress() {}
+ });
+ }
+ }
+ if(!sequenceFileOutput) {
+ return new HadoopFileWriter(recordWriter,conf);
+ } else {
Class keyClass = Class.forName(conf.getOutputKeyClass().getName());
Class valueClass = Class.forName(conf.getOutputValueClass().getName());
conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
Writer writer = SequenceFile.createWriter(fileSystem, conf, path, keyClass, valueClass);
- return new HDFSSequenceWriter(fileSystem, file.getAbsolutePath(), writer);
- } else {
- return new HDFSWriter(fileSystem, file.getPath(), null, COMMA);
- }
+ return new HadoopSequenceWriter(writer);
+ }
}
+
+
+
+ protected Reporter createReporter() {
+ return new Reporter() {
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+
+ }
+
+ @Override
+ public void progress() {
+
+ }
+
+ @Override
+ public void setStatus(String status) {
+
+ }
+ };
+}
private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
- boolean canWrite = true;
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
- FileSystem fileSystem = null;
try {
- fileSystem = FileSystem.get(conf);
+ FileSystem fileSystem = FileSystem.get(conf);
for (FileSplit fileSplit : fileSplits) {
- Path path = new Path(fileSplit.getLocalFile().getPath());
- canWrite = !fileSystem.exists(path);
- if (!canWrite) {
+ Path path = new Path(fileSplit.getPath());
+ if (fileSystem.exists(path)) {
throw new Exception(" Output path : already exists : " + path);
}
}
@@ -146,12 +213,18 @@
ioe.printStackTrace();
throw ioe;
}
- return canWrite;
+ return true;
}
- private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) {
+ private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
- if (conf.getOutputFormat() instanceof NullOutputFormat) {
+ Object outputFormat = null;
+ if(conf.getUseNewMapper()) {
+ outputFormat = ReflectionUtils.newInstance(new JobContext(conf,null).getOutputFormatClass(), conf);
+ } else {
+ outputFormat = conf.getOutputFormat();
+ }
+ if (outputFormat instanceof NullOutputFormat) {
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
for (int i = 0; i < numOutputters; i++) {
String outputPath = "/tmp/" + System.currentTimeMillis() + i;
@@ -162,12 +235,14 @@
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
+ System.out.println("absolute path:" + absolutePath);
for (int index = 0; index < numOutputters; index++) {
String suffix = new String("part-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
String outputPath = absolutePath + "/" + suffix;
- outputFileSplits[index] = new FileSplit("localhost", new File(outputPath));
+ System.out.println("output path :" + outputPath);
+ outputFileSplits[index] = new FileSplit("localhost", outputPath);
}
return outputFileSplits;
}
@@ -178,6 +253,5 @@
super(jobSpec, getOutputSplits(jobConf, numMapTasks));
this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
checkIfCanWriteToHDFS(super.splits);
- this.sequenceFileOutput = (jobConf.getOutputFormat() instanceof SequenceFileOutputFormat);
}
}
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
index c685724..29239e3 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.Serializable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
@@ -28,28 +29,40 @@
public class InputSplitsProxy implements Serializable {
private static final long serialVersionUID = 1L;
- private final Class<? extends InputSplit>[] isClasses;
+ private final Class[] isClasses;
private final byte[] bytes;
- public InputSplitsProxy(InputSplit[] inputSplits) throws IOException {
+ public InputSplitsProxy(JobConf conf, Object[] inputSplits) throws IOException {
isClasses = new Class[inputSplits.length];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
- for (int i = 0; i < inputSplits.length; ++i) {
- isClasses[i] = inputSplits[i].getClass();
- inputSplits[i].write(dos);
+ if(conf.getUseNewMapper()){
+ for (int i = 0; i < inputSplits.length; ++i) {
+ isClasses[i] = ((org.apache.hadoop.mapreduce.InputSplit)inputSplits[i]).getClass();
+ ((Writable)inputSplits[i]).write(dos);
+ }
+ } else {
+ for (int i = 0; i < inputSplits.length; ++i) {
+ isClasses[i] = ((org.apache.hadoop.mapred.InputSplit)inputSplits[i]).getClass();
+ ((Writable)inputSplits[i]).write(dos);
+ }
}
dos.close();
bytes = baos.toByteArray();
+
}
- public InputSplit[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
+ public Object[] toInputSplits(JobConf jobConf) throws InstantiationException, IllegalAccessException,
IOException {
- InputSplit[] splits = new InputSplit[isClasses.length];
+ Object[] splits = new Object[isClasses.length];
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
for (int i = 0; i < splits.length; ++i) {
splits[i] = ReflectionUtils.newInstance(isClasses[i], jobConf);
- splits[i].readFields(dis);
+ if(jobConf.getUseNewMapper()){
+ ((Writable)splits[i]).readFields(dis);
+ }else {
+ ((Writable)splits[i]).readFields(dis);
+ }
}
return splits;
}