Optimized Hadoop File Reader
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@136 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
index 82d0e9e..c12e663 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
public interface IActivityNode extends Serializable {
@@ -26,5 +27,5 @@
public IOperatorDescriptor getOwner();
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions);
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
deleted file mode 100644
index a9538d0..0000000
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.hadoop;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
-
-public abstract class AbstractHadoopFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
-
- public AbstractHadoopFileScanOperatorDescriptor(JobSpecification spec,
- RecordDescriptor recordDescriptor) {
- super(spec, 0, 1);
- recordDescriptors[0] = recordDescriptor;
- }
-
- protected abstract IRecordReader createRecordReader(InputSplit fileSplit, RecordDescriptor desc)
- throws Exception;
-
-
- protected Reporter createReporter() {
- return new Reporter() {
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
-
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
-
- }
-
- @Override
- public void progress() {
-
- }
-
- @Override
- public void setStatus(String status) {
-
- }
- };
- }
-
-
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index e7fd9c1..89ee92f 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -15,271 +15,158 @@
package edu.uci.ics.hyracks.dataflow.hadoop;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitHandler;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-import edu.uci.ics.hyracks.dataflow.std.file.IRecordReader;
-import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class HadoopReadOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
+public class HadoopReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
- protected transient InputSplit[] splits;
- protected transient Path splitFilePath;
- protected transient JobConf jobConf;
- protected transient Reporter reporter;
+ private String inputFormatClassName;
+ private Map<String, String> jobConfMap;
+ private InputSplitsProxy inputSplitsProxy;
- private static final long serialVersionUID = 1L;
- private String inputFormatClassName;
- private Map<String, String> jobConfMap;
- private static String splitDirectory = "/tmp/splits";
+ public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec) throws IOException {
+ super(spec, 0, 1);
+ this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
+ InputFormat inputFormat = jobConf.getInputFormat();
+ InputSplit[] splits = inputFormat.getSplits(jobConf, jobConf.getNumMapTasks());
+ RecordReader recordReader = inputFormat.getRecordReader(splits[0], jobConf, createReporter());
+ recordDescriptors[0] = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) recordReader
+ .createKey().getClass(), (Class<? extends Writable>) recordReader.createValue().getClass());
+ this.setPartitionConstraint(new PartitionCountConstraint(splits.length));
+ inputSplitsProxy = new InputSplitsProxy(splits);
+ this.inputFormatClassName = inputFormat.getClass().getName();
+ }
- private void initialize() {
- try {
- reporter = createReporter();
- if (jobConf == null) {
- jobConf = DatatypeHelper.map2JobConf(jobConfMap);
- }
- splits = InputSplitHandler.getInputSplits(jobConf, splitFilePath);
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
- }
+ protected Reporter createReporter() {
+ return new Reporter() {
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
- protected class FileScanOperator implements IOpenableDataWriterOperator {
- private IOpenableDataWriter<Object[]> writer;
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
- private int index;
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
- FileScanOperator(int index) {
- this.index = index;
- }
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
- @Override
- public void setDataWriter(int index,
- IOpenableDataWriter<Object[]> writer) {
- if (index != 0) {
- throw new IndexOutOfBoundsException("Invalid index: " + index);
- }
- this.writer = writer;
- }
+ }
- @Override
- public void open() throws HyracksDataException {
- if (splits == null) {
- // initialize splits by reading from split file
- initialize();
- }
- InputSplit split = splits[index];
- RecordDescriptor desc = recordDescriptors[0];
- try {
- IRecordReader reader = createRecordReader(split, desc);
- if (desc == null) {
- desc = recordDescriptors[0];
- }
- writer.open();
- try {
- while (true) {
- Object[] record = new Object[desc.getFields().length];
- if (!reader.read(record)) {
- break;
- }
- writer.writeData(record);
- }
- } finally {
- reader.close();
- writer.close();
- splitFilePath = null;
- }
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
- @Override
- public void close() throws HyracksDataException {
- // do nothing
- splitFilePath = null;
+ }
- }
+ @Override
+ public void progress() {
- @Override
- public void writeData(Object[] data) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
- }
+ }
- protected class HDFSCustomReader implements IRecordReader {
- private RecordReader hadoopRecordReader;
- private Object key;
- private Object value;
- private FileSystem fileSystem;
+ @Override
+ public void setStatus(String status) {
- public HDFSCustomReader(Map<String, String> jobConfMap,
- InputSplit inputSplit, String inputFormatClassName,
- Reporter reporter) {
- try {
- JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
- try {
- fileSystem = FileSystem.get(conf);
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
+ }
+ };
+ }
- Class inputFormatClass = Class.forName(inputFormatClassName);
- InputFormat inputFormat = (InputFormat) ReflectionUtils
- .newInstance(inputFormatClass, conf);
- hadoopRecordReader = (RecordReader) inputFormat
- .getRecordReader(inputSplit, conf, reporter);
- Class inputKeyClass;
- Class inputValueClass;
- if (hadoopRecordReader instanceof SequenceFileRecordReader) {
- inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader)
- .getKeyClass();
- inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader)
- .getValueClass();
- } else {
- inputKeyClass = hadoopRecordReader.createKey().getClass();
- inputValueClass = hadoopRecordReader.createValue()
- .getClass();
- }
- key = inputKeyClass.newInstance();
- value = inputValueClass.newInstance();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void close() {
- try {
- hadoopRecordReader.close();
- if (fileSystem != null) {
- fileSystem.delete(splitFilePath);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public boolean read(Object[] record) throws Exception {
- if (!hadoopRecordReader.next(key, value)) {
- return false;
- }
- if (record.length == 1) {
- record[0] = value;
- } else {
- record[0] = key;
- record[1] = value;
- }
- return true;
- }
- }
-
- public HadoopReadOperatorDescriptor(JobConf jobConf, JobSpecification spec)
- throws IOException {
- super(spec, 0, 1);
- this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
- InputFormat inputFormat = jobConf.getInputFormat();
- InputSplit[] splits = inputFormat.getSplits(jobConf, jobConf
- .getNumMapTasks());
- RecordReader recordReader = inputFormat.getRecordReader(splits[0],
- jobConf, createReporter());
- super.recordDescriptors[0] = DatatypeHelper
- .createKeyValueRecordDescriptor(
- (Class<? extends Writable>) recordReader.createKey()
- .getClass(),
- (Class<? extends Writable>) recordReader.createValue()
- .getClass());
- String suffix = "" + System.currentTimeMillis();
- splitFilePath = new Path(splitDirectory, suffix);
- InputSplitHandler.writeSplitFile(splits, jobConf, splitFilePath);
- this
- .setPartitionConstraint(new PartitionCountConstraint(
- splits.length));
- this.inputFormatClassName = inputFormat.getClass().getName();
- }
-
- protected IRecordReader createRecordReader(InputSplit fileSplit,
- RecordDescriptor desc) throws Exception {
- IRecordReader recordReader = new HDFSCustomReader(jobConfMap,
- fileSplit, inputFormatClassName, reporter);
- return recordReader;
- }
-
- protected Reporter createReporter() {
- return new Reporter() {
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
-
- @Override
- public InputSplit getInputSplit()
- throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
-
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
-
- }
-
- @Override
- public void progress() {
-
- }
-
- @Override
- public void setStatus(String status) {
-
- }
- };
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
- IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(
- partition), null);
- }
-
-}
+ @SuppressWarnings("deprecation")
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
+ RecordReader hadoopRecordReader;
+ Writable key;
+ Writable value;
+ InputSplit[] splits = inputSplitsProxy.toInputSplits();
+ InputSplit inputSplit = splits[partition];
+ Class inputFormatClass = Class.forName(inputFormatClassName);
+ InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
+ hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(inputSplit, conf, createReporter());
+ Class inputKeyClass;
+ Class inputValueClass;
+ if (hadoopRecordReader instanceof SequenceFileRecordReader) {
+ inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
+ inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
+ } else {
+ inputKeyClass = hadoopRecordReader.createKey().getClass();
+ inputValueClass = hadoopRecordReader.createValue().getClass();
+ }
+ key = (Writable) inputKeyClass.newInstance();
+ value = (Writable) inputValueClass.newInstance();
+ ByteBuffer outBuffer = ctx.getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ appender.reset(outBuffer, true);
+ RecordDescriptor outputRecordDescriptor = recordDescProvider.getOutputRecordDescriptor(
+ getOperatorId(), 0);
+ int nFields = outputRecordDescriptor.getFields().length;
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
+ while (hadoopRecordReader.next(key, value)) {
+ tb.reset();
+ switch (nFields) {
+ case 2:
+ tb.addField(outputRecordDescriptor.getFields()[0], key);
+ case 1:
+ tb.addField(outputRecordDescriptor.getFields()[1], value);
+ }
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ hadoopRecordReader.close();
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java
deleted file mode 100644
index e6f5395..0000000
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitHandler.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Copyright 2009-2010 University of California, Irvine
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.hadoop.util;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-
-public class InputSplitHandler {
-
- private static final int CURRENT_SPLIT_FILE_VERSION = 0;
- private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
- public static InputSplit[] getInputSplits(JobConf jobConf, Path splitFilePath) throws IOException {
- RawSplit [] rawSplits = RawSplit.readSplitsFile(jobConf, splitFilePath);
- InputSplit [] inputSplits = new InputSplit[rawSplits.length];
- for(int i=0;i<rawSplits.length;i++){
- inputSplits[i] = getInputSplit(jobConf, rawSplits[i]);
- }
- return inputSplits;
- }
-
- public static void writeSplitFile(InputSplit[] inputSplits, JobConf jobConf,Path splitFilePath)
- throws IOException {
- RawSplit.writeSplits(inputSplits ,jobConf, splitFilePath);
- }
-
- private static InputSplit getInputSplit(JobConf jobConf, RawSplit rawSplit) throws IOException{
- InputSplit inputSplit = null;
- String splitClass = rawSplit.getClassName();
- BytesWritable split = rawSplit.getBytes();
- try {
- inputSplit = (InputSplit)
- ReflectionUtils.newInstance(jobConf.getClassByName(splitClass), jobConf);
- } catch (ClassNotFoundException exp) {
- IOException wrap = new IOException("Split class " + splitClass +
- " not found");
- wrap.initCause(exp);
- throw wrap;
- }
- DataInputBuffer splitBuffer = new DataInputBuffer();
- splitBuffer.reset(split.getBytes(), 0, split.getLength());
- inputSplit.readFields(splitBuffer);
- return inputSplit;
- }
-
- protected static class RawSplit implements Writable {
- private String splitClass;
- private BytesWritable bytes = new BytesWritable();
- private String[] locations;
- long dataLength;
-
- public void setBytes(byte[] data, int offset, int length) {
- bytes.set(data, offset, length);
- }
-
- public void setClassName(String className) {
- splitClass = className;
- }
-
- public String getClassName() {
- return splitClass;
- }
-
- public BytesWritable getBytes() {
- return bytes;
- }
-
- public void clearBytes() {
- bytes = null;
- }
-
- public void setLocations(String[] locations) {
- this.locations = locations;
- }
-
- public String[] getLocations() {
- return locations;
- }
-
- public void readFields(DataInput in) throws IOException {
- splitClass = Text.readString(in);
- dataLength = in.readLong();
- bytes.readFields(in);
- int len = WritableUtils.readVInt(in);
- locations = new String[len];
- for(int i=0; i < len; ++i) {
- locations[i] = Text.readString(in);
- }
- }
-
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, splitClass);
- out.writeLong(dataLength);
- bytes.write(out);
- WritableUtils.writeVInt(out, locations.length);
- for(int i = 0; i < locations.length; i++) {
- Text.writeString(out, locations[i]);
- }
- }
-
- public long getDataLength() {
- return dataLength;
- }
-
- public void setDataLength(long l) {
- dataLength = l;
- }
-
- public static RawSplit[] readSplitsFile(JobConf conf, Path splitFilePath) throws IOException{
- FileSystem fs = FileSystem.get(conf);
- DataInputStream splitFile =
- fs.open(splitFilePath);
- try {
- byte[] header = new byte[SPLIT_FILE_HEADER.length];
- splitFile.readFully(header);
- if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
- throw new IOException("Invalid header on split file");
- }
- int vers = WritableUtils.readVInt(splitFile);
- if (vers != CURRENT_SPLIT_FILE_VERSION) {
- throw new IOException("Unsupported split version " + vers);
- }
- int len = WritableUtils.readVInt(splitFile);
- RawSplit[] result = new RawSplit[len];
- for(int i=0; i < len; ++i) {
- result[i] = new RawSplit();
- result[i].readFields(splitFile);
- }
- return result;
-
- } finally {
- splitFile.close();
- }
- }
-
- public static int writeSplits(InputSplit[] splits, JobConf job,
- Path submitSplitFile) throws IOException {
- // sort the splits into order based on size, so that the biggest
- // go first
- Arrays.sort(splits, new Comparator<InputSplit>() {
- public int compare(InputSplit a, InputSplit b) {
- try {
- long left = a.getLength();
-
- long right = b.getLength();
- if (left == right) {
- return 0;
- } else if (left < right) {
- return 1;
- } else {
- return -1;
- }
- } catch (IOException ie) {
- throw new RuntimeException("Problem getting input split size",
- ie);
- }
- }
- });
- DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
- try {
- DataOutputBuffer buffer = new DataOutputBuffer();
- RawSplit rawSplit = new RawSplit();
- for(InputSplit split: splits) {
- rawSplit.setClassName(split.getClass().getName());
- buffer.reset();
- split.write(buffer);
- rawSplit.setDataLength(split.getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(split.getLocations());
- rawSplit.write(out);
- }
- } finally {
- out.close();
- }
- return splits.length;
- }
-
- private static DataOutputStream writeSplitsFileHeader(Configuration conf,
- Path filename,
- int length
- ) throws IOException {
- FileSystem fs = filename.getFileSystem(conf);
- FSDataOutputStream out = fs.create(filename);
- out.write(SPLIT_FILE_HEADER);
- WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
- WritableUtils.writeVInt(out, length);
- return out;
- }
- }
-
-}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
new file mode 100644
index 0000000..ba23c4b
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/util/InputSplitsProxy.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+public class InputSplitsProxy implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Class<? extends InputSplit>[] isClasses;
+ private final byte[] bytes;
+
+ public InputSplitsProxy(InputSplit[] inputSplits) throws IOException {
+ isClasses = new Class[inputSplits.length];
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ for (int i = 0; i < inputSplits.length; ++i) {
+ isClasses[i] = inputSplits[i].getClass();
+ inputSplits[i].write(dos);
+ }
+ dos.close();
+ bytes = baos.toByteArray();
+ }
+
+ public InputSplit[] toInputSplits() throws InstantiationException, IllegalAccessException, IOException {
+ InputSplit[] splits = new InputSplit[isClasses.length];
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+ for (int i = 0; i < splits.length; ++i) {
+ splits[i] = isClasses[i].newInstance();
+ splits[i].readFields(dis);
+ }
+ return splits;
+ }
+}
\ No newline at end of file