modified HadoopMapper operator to work in two modes :
1) SelfReadMode: The mapper reads the input directly from HDFS instead of receiving it from another specific read operator
2) DependentMode : The mapper is not anymore a source operator, but requires input to be fed by some other operator ( eg a reducer in case of chained MR jobs )
For operators A & B that connect using a one-to-one connector, A & B can be fused together to form a single operator. The above change maked HadoopReadOperator redundant.
It is not being deleted here as it is a useful operator for reading from HDFS and could be used in other scenarios.
Modified AbstractHadoopReadOperator to take as argument in the constructor , the input arity. The input arity was earlier assumed to be 1 for Map and Reduce, but is
0 for Map in the SelfReadMode.
Modified Reducer to pass the inputArity to base class constructor
git-svn-id: https://hyracks.googlecode.com/svn/trunk@176 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
index 90354f7..89831e8 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopOperatorDescriptor.java
@@ -63,9 +63,9 @@
private final Map<String, String> jobConfMap;
private IHadoopClassFactory hadoopClassFactory;
- public AbstractHadoopOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, JobConf jobConf,
- IHadoopClassFactory hadoopOperatorFactory) {
- super(spec, 1, 1);
+ public AbstractHadoopOperatorDescriptor(JobSpecification spec, int inputArity, RecordDescriptor recordDescriptor,
+ JobConf jobConf, IHadoopClassFactory hadoopOperatorFactory) {
+ super(spec, inputArity, 1);
jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
this.hadoopClassFactory = hadoopOperatorFactory;
recordDescriptors[0] = recordDescriptor;
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index e169a9f..bbe8689 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.dataflow.hadoop;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
@@ -25,8 +27,12 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -34,36 +40,32 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
public class HadoopMapperOperatorDescriptor<K1, V1, K2, V2> extends AbstractHadoopOperatorDescriptor {
- private class MapperOperator implements IOpenableDataWriterOperator {
- private OutputCollector<K2, V2> output;
- private Reporter reporter;
- private Mapper<K1, V1, K2, V2> mapper;
- private IOpenableDataWriter<Object[]> writer;
- private int partition;
- public MapperOperator(int partition) {
+ private class MapperBaseOperator {
+ protected OutputCollector<K2, V2> output;
+ protected Reporter reporter;
+ protected Mapper<K1, V1, K2, V2> mapper;
+ protected int partition;
+ protected JobConf conf;
+ protected IOpenableDataWriter<Object[]> writer;
+
+ public MapperBaseOperator(int partition) {
this.partition = partition;
- };
-
- @Override
- public void close() throws HyracksDataException {
- try {
- mapper.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- writer.close();
}
- @Override
- public void open() throws HyracksDataException {
+ protected void initializeMapper() throws HyracksDataException {
jobConf = getJobConf();
populateCache(jobConf);
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
@@ -72,23 +74,84 @@
} catch (Exception e) {
throw new HyracksDataException(e);
}
- if (inputSplitsProxy != null) {
- updateConfWithSplit();
- }
- mapper.configure(jobConf);
- writer.open();
- output = new DataWritingOutputCollector<K2, V2>(writer);
+ conf = new JobConf(jobConf);
+ conf.setClassLoader(jobConf.getClassLoader());
reporter = createReporter();
}
- private void updateConfWithSplit() {
+ protected void map(Object[] data) throws HyracksDataException {
try {
- InputSplit[] splits = inputSplitsProxy.toInputSplits(jobConf);
- InputSplit splitRead = splits[partition];
+ mapper.map((K1) data[0], (V1) data[1], output, reporter);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (RuntimeException re) {
+ System.out.println(" Runtime exceptione encoutered for row :" + data[0] + ": " + data[1]);
+ re.printStackTrace();
+ }
+ }
+
+ protected void closeMapper() throws HyracksDataException {
+ try {
+ mapper.close();
+ } catch (IOException ioe) {
+ throw new HyracksDataException(ioe);
+ }
+ }
+
+ }
+
+ private class MapperOperator extends MapperBaseOperator implements IOpenableDataWriterOperator {
+
+ public MapperOperator(int partition) {
+ super(partition);
+ };
+
+ @Override
+ public void close() throws HyracksDataException {
+ super.closeMapper();
+ writer.close();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ initializeMapper();
+ mapper.configure(conf);
+ writer.open();
+ output = new DataWritingOutputCollector<K2, V2>(writer);
+ }
+
+ @Override
+ public void writeData(Object[] data) throws HyracksDataException {
+ super.map(data);
+ }
+
+ public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+ if (index != 0) {
+ throw new IllegalArgumentException();
+ }
+ this.writer = writer;
+ }
+ }
+
+ private class ReaderMapperOperator extends MapperBaseOperator {
+
+ public ReaderMapperOperator(int partition, IOpenableDataWriter writer) throws HyracksDataException {
+ super(partition);
+ output = new DataWritingOutputCollector<K2, V2>(writer);
+ this.writer = writer;
+ this.writer.open();
+ }
+
+ protected void updateConfWithSplit(JobConf conf) {
+ try {
+ if (inputSplits == null) {
+ inputSplits = inputSplitsProxy.toInputSplits(conf);
+ }
+ InputSplit splitRead = inputSplits[partition];
if (splitRead instanceof FileSplit) {
- jobConf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
- jobConf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
- jobConf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
+ conf.set("map.input.file", ((FileSplit) splitRead).getPath().toString());
+ conf.setLong("map.input.start", ((FileSplit) splitRead).getStart());
+ conf.setLong("map.input.length", ((FileSplit) splitRead).getLength());
}
} catch (Exception e) {
e.printStackTrace();
@@ -98,21 +161,53 @@
}
}
- @Override
- public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
-
- @Override
- public void writeData(Object[] data) throws HyracksDataException {
+ public void mapInput() throws HyracksDataException {
try {
- mapper.map((K1) data[0], (V1) data[1], output, reporter);
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ initializeMapper();
+ updateConfWithSplit(conf);
+ mapper.configure(conf);
+ conf.setClassLoader(this.getClass().getClassLoader());
+ RecordReader hadoopRecordReader;
+ Object key;
+ Object value;
+ InputSplit inputSplit = inputSplits[partition];
+ Class inputFormatClass = conf.getInputFormat().getClass();
+ InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+ hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
+ Class inputKeyClass;
+ Class inputValueClass;
+ if (hadoopRecordReader instanceof SequenceFileRecordReader) {
+ inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
+ inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
+ } else {
+ inputKeyClass = hadoopRecordReader.createKey().getClass();
+ inputValueClass = hadoopRecordReader.createValue().getClass();
+ }
+
+ key = hadoopRecordReader.createKey();
+ value = hadoopRecordReader.createValue();
+ RecordDescriptor outputRecordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) hadoopRecordReader.createKey().getClass(),
+ (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
+ int nFields = outputRecordDescriptor.getFields().length;
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
+ Object[] data = new Object[2];
+ while (hadoopRecordReader.next(key, value)) {
+ data[0] = key;
+ data[1] = value;
+ super.map(data);
+ }
+ hadoopRecordReader.close();
} catch (IOException e) {
throw new HyracksDataException(e);
}
+
+ }
+
+ public void close() throws HyracksDataException {
+ super.closeMapper();
+ writer.close();
}
}
@@ -120,6 +215,7 @@
private Class<? extends Mapper> mapperClass;
private InputSplitsProxy inputSplitsProxy;
private transient InputSplit[] inputSplits;
+ private boolean selfRead = false;
private void initializeSplitInfo(InputSplit[] splits) throws IOException {
jobConf = super.getJobConf();
@@ -127,12 +223,16 @@
inputSplitsProxy = new InputSplitsProxy(splits);
}
+ public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory)
+ throws IOException {
+ super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
+ }
+
public HadoopMapperOperatorDescriptor(JobSpecification spec, JobConf jobConf, InputSplit[] splits,
IHadoopClassFactory hadoopClassFactory) throws IOException {
- super(spec, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
- if (splits != null) {
- initializeSplitInfo(splits);
- }
+ super(spec, 0, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
+ initializeSplitInfo(splits);
+ this.selfRead = true;
}
public static RecordDescriptor getRecordDescriptor(JobConf conf, IHadoopClassFactory hadoopClassFactory) {
@@ -169,22 +269,40 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- RecordDescriptor recordDescriptor = null;
+
JobConf conf = getJobConf();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
try {
- if (inputSplits == null) {
- inputSplits = inputSplitsProxy.toInputSplits(conf);
+ if (selfRead) {
+ RecordDescriptor recordDescriptor = null;
+ if (inputSplits == null) {
+ inputSplits = inputSplitsProxy.toInputSplits(conf);
+ }
+ RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[partition], conf,
+ super.createReporter());
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) reader
+ .createKey().getClass(), (Class<? extends Writable>) reader.createValue().getClass());
+ return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
+ } else {
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
+ .getInputRecordDescriptor(this.odId, 0));
}
- RecordReader reader = conf.getInputFormat().getRecordReader(inputSplits[partition], conf,
- super.createReporter());
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) reader
- .createKey().getClass(), (Class<? extends Writable>) reader.createValue().getClass());
} catch (Exception e) {
throw new HyracksDataException(e);
}
+ }
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescriptor);
+ private IOperatorNodePushable createSelfReadingMapper(final IHyracksContext ctx, IOperatorEnvironment env,
+ final RecordDescriptor recordDescriptor, final int partition) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ SerializingDataWriter writer = new SerializingDataWriter(ctx, recordDescriptor, this.writer);
+ ReaderMapperOperator readMapOp = new ReaderMapperOperator(partition, writer);
+ readMapOp.mapInput();
+ readMapOp.close();
+ }
+ };
}
public Class<? extends Mapper> getMapperClass() {
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 7dbed84..728929a 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -183,7 +183,7 @@
public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IComparatorFactory comparatorFactory,
IHadoopClassFactory classFactory) {
- super(spec, getRecordDescriptor(conf, classFactory), conf, classFactory);
+ super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
this.comparatorFactory = comparatorFactory;
}