Integrated more native MR operators and a Shuffle Connector
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1244 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
index e621a09..f79a464 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -17,10 +17,9 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
public interface IInputChannel {
- public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException;
+ public void registerMonitor(IInputChannelMonitor monitor);
public void setAttachment(Object attachment);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index ae2cd37..609d7f0 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -63,7 +63,7 @@
}
@Override
- public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException {
+ public void registerMonitor(IInputChannelMonitor monitor) {
this.monitor = monitor;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index b742316..0ea3124 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -23,7 +23,6 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -55,7 +54,7 @@
}
@Override
- public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException {
+ public void registerMonitor(IInputChannelMonitor monitor) {
this.monitor = monitor;
}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
new file mode 100644
index 0000000..e8b6e8b
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopHelper.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+
+public class HadoopHelper {
+ public static final int KEY_FIELD_INDEX = 0;
+ public static final int VALUE_FIELD_INDEX = 1;
+ public static final int BLOCKID_FIELD_INDEX = 2;
+ private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
+
+ private MarshalledWritable<Configuration> mConfig;
+ private Configuration config;
+ private Job job;
+
+ public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+ this.mConfig = mConfig;
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ config = mConfig.get();
+ config.setClassLoader(getClass().getClassLoader());
+ job = new Job(config);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
+ try {
+ return new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass()),
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
+
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException {
+ try {
+ return new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass()),
+ DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass()) });
+
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(config.getClassLoader());
+ return new TaskAttemptContext(config, taId);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public JobContext createJobContext() {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(config.getClassLoader());
+ return new JobContext(config, null);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
+ try {
+ return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
+ try {
+ return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K2, V2> Reducer<K2, V2, K2, V2> getCombiner() throws HyracksDataException {
+ try {
+ return (Reducer<K2, V2, K2, V2>) HadoopTools.newInstance(job.getCombinerClass());
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ } catch (InstantiationException e) {
+ throw new HyracksDataException(e);
+ } catch (IllegalAccessException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
+ try {
+ return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> List<InputSplit> getInputSplits() throws HyracksDataException {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ InputFormat<K, V> fmt = getInputFormat();
+ JobContext jCtx = new JobContext(config, null);
+ try {
+ return fmt.getSplits(jCtx);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public IBinaryComparatorFactory[] getSortComparatorFactories() {
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+ .getSortComparator().getClass());
+
+ return new IBinaryComparatorFactory[] { comparatorFactory };
+ }
+
+ public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
+ WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+ .getGroupingComparator().getClass());
+
+ return new IBinaryComparatorFactory[] { comparatorFactory };
+ }
+
+ public RawComparator<?> getRawGroupingComparator() {
+ return job.getGroupingComparator();
+ }
+
+ public int getSortFrameLimit(IHyracksCommonContext ctx) {
+ int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
+ return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
+ }
+
+ public Job getJob() {
+ return job;
+ }
+
+ public MarshalledWritable<Configuration> getMarshalledConfiguration() {
+ return mConfig;
+ }
+
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
+ int nReducers = job.getNumReduceTasks();
+ try {
+ return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
+ (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
+ (ISerializerDeserializer<Writable>) DatatypeHelper
+ .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
+ (ISerializerDeserializer<Writable>) DatatypeHelper
+ .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public int[] getSortFields() {
+ return KEY_SORT_FIELDS;
+ }
+
+ public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
+ return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputKeyClass());
+ }
+
+ public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
+ return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+ .getMapOutputValueClass());
+ }
+
+ public FileSystem getFilesystem() throws HyracksDataException {
+ try {
+ return FileSystem.get(config);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
+ try {
+ return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public boolean hasCombiner() throws HyracksDataException {
+ try {
+ return job.getCombinerClass() != null;
+ } catch (ClassNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
new file mode 100644
index 0000000..7e4d67f
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HadoopTools.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+public class HadoopTools {
+ public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
+ Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
+ return newInstance(clazz);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
+ return clazz.newInstance();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
new file mode 100644
index 0000000..7c6bf86
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/HashPartitioningShuffleConnectorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.util.BitSet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.connectors.PartitionDataWriter;
+
+public class HashPartitioningShuffleConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final MarshalledWritable<Configuration> mConfig;
+
+ public HashPartitioningShuffleConnectorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+ super(spec);
+ this.mConfig = mConfig;
+ }
+
+ @Override
+ public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+ IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ HadoopHelper helper = new HadoopHelper(mConfig);
+ ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
+ return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+ }
+
+ @Override
+ public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+ int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ BitSet expectedPartitions = new BitSet();
+ expectedPartitions.set(0, nProducerPartitions);
+ NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
+ expectedPartitions);
+ IFrameReader frameReader = new ShuffleFrameReader(ctx, channelReader, mConfig);
+ return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
+ channelReader);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
new file mode 100644
index 0000000..1545c06
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProvider {
+ public InputSplit next() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
new file mode 100644
index 0000000..73588ab
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/IInputSplitProviderFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputSplitProviderFactory extends Serializable {
+ public IInputSplitProvider createInputSplitProvider(int id) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
new file mode 100644
index 0000000..b37084e
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/InputFileSplit.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class InputFileSplit extends InputSplit implements Writable {
+ private Path file;
+ private long start;
+ private long length;
+ private int blockId;
+ private String[] hosts;
+ private long scheduleTime;
+
+ public InputFileSplit() {
+ }
+
+ public InputFileSplit(int blockId, Path file, long start, long length, String[] hosts, long schedule_time) {
+ this.blockId = blockId;
+ this.file = file;
+ this.start = start;
+ this.length = length;
+ this.hosts = hosts;
+ this.scheduleTime = schedule_time;
+ }
+
+ public int blockId() {
+ return blockId;
+ }
+
+ public long scheduleTime() {
+ return this.scheduleTime;
+ }
+
+ public Path getPath() {
+ return file;
+ }
+
+ /** The position of the first byte in the file to process. */
+ public long getStart() {
+ return start;
+ }
+
+ /** The number of bytes in the file to process. */
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public String toString() {
+ return file + ":" + start + "+" + length;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, file.toString());
+ out.writeLong(start);
+ out.writeLong(length);
+ out.writeInt(blockId);
+ out.writeLong(this.scheduleTime);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ file = new Path(Text.readString(in));
+ start = in.readLong();
+ length = in.readLong();
+ hosts = null;
+ this.blockId = in.readInt();
+ this.scheduleTime = in.readLong();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ if (this.hosts == null) {
+ return new String[] {};
+ } else {
+ return this.hosts;
+ }
+ }
+
+ public FileSplit toFileSplit() {
+ return new FileSplit(file, start, length, hosts);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
new file mode 100644
index 0000000..a00fa7f0
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/KVIterator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.util.Progress;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class KVIterator implements RawKeyValueIterator {
+ private final HadoopHelper helper;
+ private FrameTupleAccessor accessor;
+ private DataInputBuffer kBuffer;
+ private DataInputBuffer vBuffer;
+ private List<ByteBuffer> buffers;
+ private int bSize;
+ private int bPtr;
+ private int tIdx;
+ private boolean eog;
+
+ public KVIterator(IHyracksTaskContext ctx, HadoopHelper helper, RecordDescriptor recordDescriptor) {
+ this.helper = helper;
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ kBuffer = new DataInputBuffer();
+ vBuffer = new DataInputBuffer();
+ }
+
+ void reset(List<ByteBuffer> buffers, int bSize) {
+ this.buffers = buffers;
+ this.bSize = bSize;
+ bPtr = 0;
+ tIdx = 0;
+ eog = false;
+ if (bSize > 0) {
+ accessor.reset(buffers.get(0));
+ tIdx = -1;
+ } else {
+ eog = true;
+ }
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return kBuffer;
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return vBuffer;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ while (true) {
+ if (eog) {
+ return false;
+ }
+ ++tIdx;
+ if (accessor.getTupleCount() <= tIdx) {
+ ++bPtr;
+ if (bPtr >= bSize) {
+ eog = true;
+ continue;
+ }
+ tIdx = -1;
+ accessor.reset(buffers.get(bPtr));
+ continue;
+ }
+ kBuffer.reset(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.KEY_FIELD_INDEX),
+ accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
+ vBuffer.reset(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.VALUE_FIELD_INDEX),
+ accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
+ break;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
new file mode 100644
index 0000000..4a61296
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+ extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final int jobId;
+ private final MarshalledWritable<Configuration> config;
+ private final IInputSplitProviderFactory factory;
+
+ public MapperOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> config,
+ IInputSplitProviderFactory factory) throws HyracksDataException {
+ super(spec, 0, 1);
+ this.jobId = jobId;
+ this.config = config;
+ this.factory = factory;
+ HadoopHelper helper = new HadoopHelper(config);
+ recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final HadoopHelper helper = new HadoopHelper(config);
+ final Configuration conf = helper.getConfiguration();
+ final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
+ final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
+ final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
+ final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
+ final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+
+ final int framesLimit = helper.getSortFrameLimit(ctx);
+ final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+
+ class SortingRecordWriter extends RecordWriter<K2, V2> {
+ private final ArrayTupleBuilder tb;
+ private final ByteBuffer frame;
+ private final FrameTupleAppender fta;
+ private ExternalSortRunGenerator runGen;
+ private int blockId;
+
+ public SortingRecordWriter() throws HyracksDataException {
+ tb = new ArrayTupleBuilder(2);
+ frame = ctx.allocateFrame();
+ fta = new FrameTupleAppender(ctx.getFrameSize());
+ fta.reset(frame, true);
+ }
+
+ public void initBlock(int blockId) throws HyracksDataException {
+ runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
+ helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit);
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void write(K2 key, V2 value) throws IOException, InterruptedException {
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ key.write(dos);
+ tb.addFieldEndOffset();
+ value.write(dos);
+ tb.addFieldEndOffset();
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ runGen.nextFrame(frame);
+ fta.reset(frame, true);
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+ if (fta.getTupleCount() > 0) {
+ runGen.nextFrame(frame);
+ fta.reset(frame, true);
+ }
+ runGen.close();
+ IFrameWriter delegatingWriter = new IFrameWriter() {
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ private final ByteBuffer outFrame = ctx.allocateFrame();
+ private final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(),
+ helper.getMapOutputRecordDescriptorWithoutExtraFields());
+ private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
+
+ @Override
+ public void open() throws HyracksDataException {
+ appender.reset(outFrame, true);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ fta.reset(buffer);
+ int n = fta.getTupleCount();
+ for (int i = 0; i < n; ++i) {
+ tb.reset();
+ tb.addField(fta, i, 0);
+ tb.addField(fta, i, 1);
+ try {
+ tb.getDataOutput().writeInt(blockId);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ tb.addFieldEndOffset();
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+ };
+ if (helper.hasCombiner()) {
+ Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
+ TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
+ TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
+ final IFrameWriter outputWriter = delegatingWriter;
+ RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
+ private final FrameTupleAppender fta = new FrameTupleAppender(ctx.getFrameSize());
+ private final ByteBuffer buffer = ctx.allocateFrame();
+ private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+
+ {
+ fta.reset(buffer, true);
+ outputWriter.open();
+ }
+
+ @Override
+ public void write(K2 key, V2 value) throws IOException, InterruptedException {
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ key.write(dos);
+ tb.addFieldEndOffset();
+ value.write(dos);
+ tb.addFieldEndOffset();
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(buffer, outputWriter);
+ fta.reset(buffer, true);
+ if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ if (fta.getTupleCount() > 0) {
+ FrameUtils.flushFrame(buffer, outputWriter);
+ outputWriter.close();
+ }
+ }
+ };
+ delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
+ new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
+ helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
+ ctaskAttemptContext);
+ }
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
+ runGen.getRuns(), new int[] { 0 }, comparators,
+ helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
+ merger.process();
+ }
+ }
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ writer.open();
+ try {
+ SortingRecordWriter recordWriter = new SortingRecordWriter();
+ InputSplit split = null;
+ int blockId = 0;
+ while ((split = isp.next()) != null) {
+ try {
+ RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
+ taskAttemptContext);
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ recordReader.initialize(split, taskAttemptContext);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ recordWriter.initBlock(blockId);
+ Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, taId, recordReader,
+ recordWriter, null, null, split);
+ mapper.run(mCtx);
+ recordReader.close();
+ recordWriter.sortAndFlushBlock(writer);
+ ++blockId;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
new file mode 100644
index 0000000..be05f22
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MarshalledWritable.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+
+public class MarshalledWritable<T extends Writable> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] bytes;
+
+ public MarshalledWritable() {
+ }
+
+ public void set(T o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeUTF(o.getClass().getName());
+ o.write(dos);
+ dos.close();
+ bytes = baos.toByteArray();
+ }
+
+ public T get() throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bais);
+ String className = dis.readUTF();
+ T o = (T) HadoopTools.newInstance(className);
+ o.readFields(dis);
+ return o;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
new file mode 100644
index 0000000..33d58af
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReduceWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class ReduceWriter<K2, V2, K3, V3> implements IFrameWriter {
+ private final IHyracksTaskContext ctx;
+ private final HadoopHelper helper;
+ private final int[] groupFields;
+ private final FrameTupleAccessor accessor0;
+ private final FrameTupleAccessor accessor1;
+ private final ByteBuffer copyFrame;
+ private final IBinaryComparator[] comparators;
+ private final KVIterator kvi;
+ private final Reducer<K2, V2, K3, V3> reducer;
+ private final RecordWriter<K3, V3> recordWriter;
+ private final TaskAttemptID taId;
+ private final TaskAttemptContext taskAttemptContext;
+
+ private boolean first;
+ private boolean groupStarted;
+ private List<ByteBuffer> group;
+ private int bPtr;
+ private FrameTupleAppender fta;
+ private Counter keyCounter;
+ private Counter valueCounter;
+
+ public ReduceWriter(IHyracksTaskContext ctx, HadoopHelper helper, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+ Reducer<K2, V2, K3, V3> reducer, RecordWriter<K3, V3> recordWriter, TaskAttemptID taId,
+ TaskAttemptContext taskAttemptContext) {
+ this.ctx = ctx;
+ this.helper = helper;
+ this.groupFields = groupFields;
+ accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ copyFrame = ctx.allocateFrame();
+ accessor1.reset(copyFrame);
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.reducer = reducer;
+ this.recordWriter = recordWriter;
+ this.taId = taId;
+ this.taskAttemptContext = taskAttemptContext;
+
+ kvi = new KVIterator(ctx, helper, recordDescriptor);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ first = true;
+ groupStarted = false;
+ group = new ArrayList<ByteBuffer>();
+ bPtr = 0;
+ group.add(ctx.allocateFrame());
+ fta = new FrameTupleAppender(ctx.getFrameSize());
+ keyCounter = new Counter() {
+ };
+ valueCounter = new Counter() {
+ };
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor0.reset(buffer);
+ int nTuples = accessor0.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
+ groupInit();
+ first = false;
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
+ } else {
+ switchGroupIfRequired(accessor0, i - 1, accessor0, i);
+ }
+ }
+ accumulate(accessor0, i);
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void accumulate(FrameTupleAccessor accessor, int tIndex) {
+ if (!fta.append(accessor, tIndex)) {
+ ++bPtr;
+ if (group.size() <= bPtr) {
+ group.add(ctx.allocateFrame());
+ }
+ fta.reset(group.get(bPtr), true);
+ if (!fta.append(accessor, tIndex)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ reduce();
+ groupInit();
+ }
+ }
+
+ private void groupInit() {
+ groupStarted = true;
+ bPtr = 0;
+ fta.reset(group.get(0), true);
+ }
+
+ private void reduce() throws HyracksDataException {
+ kvi.reset(group, bPtr + 1);
+ try {
+ Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), taId, kvi,
+ keyCounter, valueCounter, recordWriter, null, null,
+ (RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
+ .getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
+ reducer.run(rCtx);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ groupStarted = false;
+ }
+
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (groupStarted) {
+ reduce();
+ }
+ try {
+ recordWriter.close(taskAttemptContext);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
new file mode 100644
index 0000000..bbb0d74
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ReducerOperatorDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ReducerOperatorDescriptor<K2 extends Writable, V2 extends Writable, K3 extends Writable, V3 extends Writable>
+ extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final int jobId;
+
+ private MarshalledWritable<Configuration> mConfig;
+
+ public ReducerOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> mConfig) {
+ super(spec, 1, 0);
+ this.jobId = jobId;
+ this.mConfig = mConfig;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ final HadoopHelper helper = new HadoopHelper(mConfig);
+ final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
+ final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
+ final int[] groupFields = helper.getSortFields();
+ IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
+
+ final TaskAttemptID taId = new TaskAttemptID("foo", jobId, false, partition, 0);
+ final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
+ final RecordWriter recordWriter;
+ try {
+ recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+
+ final ReduceWriter<K2, V2, K3, V3> rw = new ReduceWriter<K2, V2, K3, V3>(ctx, helper, groupFields,
+ groupingComparators, recordDescriptor, reducer, recordWriter, taId, taskAttemptContext);
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+ rw.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ rw.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ rw.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
new file mode 100644
index 0000000..8e03eae
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class ShuffleFrameReader implements IFrameReader {
+ private final IHyracksTaskContext ctx;
+ private final NonDeterministicChannelReader channelReader;
+ private final HadoopHelper helper;
+ private final RecordDescriptor recordDescriptor;
+ private List<RunFileWriter> runFileWriters;
+ private RunFileReader reader;
+
+ public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
+ MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+ this.ctx = ctx;
+ this.channelReader = channelReader;
+ helper = new HadoopHelper(mConfig);
+ this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ channelReader.open();
+ int nSenders = channelReader.getSenderPartitionCount();
+ runFileWriters = new ArrayList<RunFileWriter>();
+ RunInfo[] infos = new RunInfo[nSenders];
+ FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ IInputChannel[] channels = channelReader.getChannels();
+ while (true) {
+ int entry = channelReader.findNextSender();
+ if (entry < 0) {
+ break;
+ }
+ RunInfo info = infos[entry];
+ IInputChannel channel = channels[entry];
+ ByteBuffer netBuffer = channel.getNextBuffer();
+ accessor.reset(netBuffer);
+ int nTuples = accessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ int tBlockId = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+ if (info == null) {
+ info = new RunInfo();
+ info.reset(tBlockId);
+ infos[entry] = info;
+ } else if (info.blockId != tBlockId) {
+ info.close();
+ info.reset(tBlockId);
+ }
+ info.write(accessor, i);
+ }
+ channel.recycleBuffer(netBuffer);
+ }
+ for (int i = 0; i < infos.length; ++i) {
+ RunInfo info = infos[i];
+ if (info != null) {
+ info.close();
+ }
+ }
+ infos = null;
+
+ FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+ int framesLimit = helper.getSortFrameLimit(ctx);
+ IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ List<IFrameReader> runs = new LinkedList<IFrameReader>();
+ for (RunFileWriter rfw : runFileWriters) {
+ runs.add(rfw.createReader());
+ }
+ RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
+ recordDescriptor, framesLimit, rfw);
+ merger.process();
+
+ reader = rfw.createReader();
+ reader.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ return reader.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ reader.close();
+ }
+
+ private class RunInfo {
+ private final ByteBuffer buffer;
+ private final FrameTupleAppender fta;
+
+ private FileReference file;
+ private RunFileWriter rfw;
+ private int blockId;
+
+ public RunInfo() {
+ buffer = ctx.allocateFrame();
+ fta = new FrameTupleAppender(ctx.getFrameSize());
+ }
+
+ public void reset(int blockId) throws HyracksDataException {
+ this.blockId = blockId;
+ fta.reset(buffer, true);
+ try {
+ file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
+ rfw = new RunFileWriter(file, ctx.getIOManager());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
+ if (!fta.append(accessor, tIdx)) {
+ flush();
+ if (!fta.append(accessor, tIdx)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public void close() throws HyracksDataException {
+ flush();
+ rfw.close();
+ runFileWriters.add(rfw);
+ }
+
+ private void flush() throws HyracksDataException {
+ if (fta.getTupleCount() <= 0) {
+ return;
+ }
+ buffer.limit(buffer.capacity());
+ buffer.position(0);
+ rfw.nextFrame(buffer);
+ fta.reset(buffer, true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionAcceptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionAcceptor.java
new file mode 100644
index 0000000..8942ea9
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionAcceptor.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public interface IPartitionAcceptor {
+ public void addPartition(PartitionId pid, IInputChannel channel);
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionBatchManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionBatchManager.java
new file mode 100644
index 0000000..4962d64
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/IPartitionBatchManager.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPartitionBatchManager extends IPartitionAcceptor {
+ public void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
new file mode 100644
index 0000000..713fb0e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -0,0 +1,77 @@
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class InputChannelFrameReader implements IFrameReader, IInputChannelMonitor {
+ private final IInputChannel channel;
+
+ private int availableFrames;
+
+ private boolean eos;
+
+ private boolean failed;
+
+ public InputChannelFrameReader(IInputChannel channel) {
+ this.channel = channel;
+ availableFrames = 0;
+ eos = false;
+ failed = false;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (this) {
+ while (!failed && !eos && availableFrames <= 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (failed) {
+ throw new HyracksDataException("Failure occurred on input");
+ }
+ if (availableFrames <= 0 && eos) {
+ return false;
+ }
+ --availableFrames;
+ }
+ ByteBuffer srcBuffer = channel.getNextBuffer();
+ FrameUtils.copy(srcBuffer, buffer);
+ channel.recycleBuffer(srcBuffer);
+ return true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ @Override
+ public synchronized void notifyFailure(IInputChannel channel) {
+ failed = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ availableFrames += nFrames;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IInputChannel channel) {
+ eos = true;
+ notifyAll();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
new file mode 100644
index 0000000..578bfec
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class NonDeterministicChannelReader implements IInputChannelMonitor, IPartitionAcceptor {
+ private static final Logger LOGGER = Logger.getLogger(NonDeterministicChannelReader.class.getName());
+
+ private final int nSenderPartitions;
+
+ private final IInputChannel[] channels;
+
+ private final BitSet frameAvailability;
+
+ private final int[] availableFrameCounts;
+
+ private final BitSet eosSenders;
+
+ private final BitSet failSenders;
+
+ private final BitSet closedSenders;
+
+ private int lastReadSender;
+
+ public NonDeterministicChannelReader(int nSenderPartitions, BitSet expectedPartitions) {
+ this.nSenderPartitions = nSenderPartitions;
+ channels = new IInputChannel[nSenderPartitions];
+ eosSenders = new BitSet(nSenderPartitions);
+ failSenders = new BitSet(nSenderPartitions);
+ closedSenders = new BitSet(nSenderPartitions);
+ closedSenders.or(expectedPartitions);
+ closedSenders.flip(0, nSenderPartitions);
+ frameAvailability = new BitSet(nSenderPartitions);
+ availableFrameCounts = new int[nSenderPartitions];
+ }
+
+ @Override
+ public void addPartition(PartitionId pid, IInputChannel channel) {
+ channel.registerMonitor(this);
+ channel.setAttachment(pid);
+ synchronized (this) {
+ channels[pid.getSenderIndex()] = channel;
+ }
+ }
+
+ public int getSenderPartitionCount() {
+ return nSenderPartitions;
+ }
+
+ public void open() throws HyracksDataException {
+ lastReadSender = 0;
+ }
+
+ public IInputChannel[] getChannels() {
+ return channels;
+ }
+
+ public synchronized int findNextSender() throws HyracksDataException {
+ while (true) {
+ switch (lastReadSender) {
+ default:
+ lastReadSender = frameAvailability.nextSetBit(lastReadSender + 1);
+ if (lastReadSender >= 0) {
+ break;
+ }
+ case 0:
+ lastReadSender = frameAvailability.nextSetBit(0);
+ }
+ if (lastReadSender >= 0) {
+ assert availableFrameCounts[lastReadSender] > 0;
+ if (--availableFrameCounts[lastReadSender] == 0) {
+ frameAvailability.clear(lastReadSender);
+ }
+ return lastReadSender;
+ }
+ if (!failSenders.isEmpty()) {
+ throw new HyracksDataException("Failure occurred on input");
+ }
+ for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
+ channels[i].close();
+ eosSenders.clear(i);
+ closedSenders.set(i);
+ }
+ int nextClosedBitIndex = closedSenders.nextClearBit(0);
+ if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
+ lastReadSender = -1;
+ return lastReadSender;
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ public synchronized void close() throws HyracksDataException {
+ for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
+ .nextClearBit(i + 1)) {
+ if (channels[i] != null) {
+ channels[i].close();
+ channels[i] = null;
+ }
+ }
+ }
+
+ @Override
+ public synchronized void notifyFailure(IInputChannel channel) {
+ PartitionId pid = (PartitionId) channel.getAttachment();
+ int senderIndex = pid.getSenderIndex();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Failure: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: "
+ + pid.getReceiverIndex());
+ }
+ failSenders.set(senderIndex);
+ eosSenders.set(senderIndex);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ PartitionId pid = (PartitionId) channel.getAttachment();
+ int senderIndex = pid.getSenderIndex();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Data available: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: "
+ + pid.getReceiverIndex());
+ }
+ availableFrameCounts[senderIndex] += nFrames;
+ frameAvailability.set(senderIndex);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IInputChannel channel) {
+ PartitionId pid = (PartitionId) channel.getAttachment();
+ int senderIndex = pid.getSenderIndex();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("EOS: " + pid);
+ }
+ eosSenders.set(senderIndex);
+ notifyAll();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
new file mode 100644
index 0000000..657165c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class NonDeterministicFrameReader implements IFrameReader {
+ private final NonDeterministicChannelReader channelReader;
+
+ public NonDeterministicFrameReader(NonDeterministicChannelReader channelReader) {
+ this.channelReader = channelReader;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ channelReader.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int index = channelReader.findNextSender();
+ if (index >= 0) {
+ IInputChannel[] channels = channelReader.getChannels();
+ ByteBuffer srcFrame = channels[index].getNextBuffer();
+ FrameUtils.copy(srcFrame, buffer);
+ channels[index].recycleBuffer(srcFrame);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws HyracksDataException {
+ channelReader.close();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionBatchManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionBatchManager.java
new file mode 100644
index 0000000..ba25850
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionBatchManager.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class NonDeterministicPartitionBatchManager implements IPartitionBatchManager {
+ private final IInputChannel[] channels;
+
+ private List<IFrameReader> partitions;
+
+ private List<IFrameReader> batch;
+
+ private int requiredSize;
+
+ public NonDeterministicPartitionBatchManager(int nSenders) {
+ channels = new IInputChannel[nSenders];
+ partitions = new ArrayList<IFrameReader>();
+ }
+
+ @Override
+ public synchronized void addPartition(PartitionId pid, IInputChannel channel) {
+ channels[pid.getSenderIndex()] = channel;
+ InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
+ channel.registerMonitor(channelReader);
+ if (batch != null && batch.size() < requiredSize) {
+ batch.add(channelReader);
+ if (batch.size() == requiredSize) {
+ notifyAll();
+ }
+ } else {
+ partitions.add(channelReader);
+ }
+ }
+
+ @Override
+ public synchronized void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException {
+ if (partitions.size() <= size) {
+ batch.addAll(partitions);
+ partitions.clear();
+ } else if (partitions.size() > size) {
+ List<IFrameReader> sublist = partitions.subList(0, size);
+ batch.addAll(sublist);
+ sublist.clear();
+ }
+ if (batch.size() == size) {
+ return;
+ }
+ this.batch = batch;
+ this.requiredSize = size;
+ while (batch.size() < size) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ this.batch = null;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
deleted file mode 100644
index af2670b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.collectors;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class NonDeterministicPartitionCollector extends AbstractPartitionCollector {
- private static final Logger LOGGER = Logger.getLogger(NonDeterministicPartitionCollector.class.getName());
-
- private final FrameReader reader;
-
- private final BitSet expectedPartitions;
-
- private final int nSenderPartitions;
-
- private final IInputChannel[] channels;
-
- private final BitSet frameAvailability;
-
- private final int[] availableFrameCounts;
-
- private final BitSet eosSenders;
-
- private final BitSet failSenders;
-
- private BitSet closedSenders;
-
- private int lastReadSender;
-
- public NonDeterministicPartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId,
- int receiverIndex, int nSenderPartitions, BitSet expectedPartitions) {
- super(ctx, connectorId, receiverIndex);
- this.expectedPartitions = expectedPartitions;
- this.nSenderPartitions = nSenderPartitions;
- reader = new FrameReader();
- channels = new IInputChannel[nSenderPartitions];
- eosSenders = new BitSet(nSenderPartitions);
- failSenders = new BitSet(nSenderPartitions);
- closedSenders = new BitSet(nSenderPartitions);
- closedSenders.or(expectedPartitions);
- closedSenders.flip(0, nSenderPartitions);
- frameAvailability = new BitSet(nSenderPartitions);
- availableFrameCounts = new int[nSenderPartitions];
- }
-
- @Override
- public void open() throws HyracksException {
- lastReadSender = 0;
- }
-
- @Override
- public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
- for (PartitionChannel pc : partitions) {
- PartitionId pid = pc.getPartitionId();
- IInputChannel channel = pc.getInputChannel();
- channel.setAttachment(pid);
- channel.registerMonitor(reader);
- synchronized (this) {
- channels[pid.getSenderIndex()] = channel;
- }
- channel.open();
- }
- }
-
- @Override
- public IFrameReader getReader() throws HyracksException {
- return reader;
- }
-
- @Override
- public void close() throws HyracksException {
- }
-
- private final class FrameReader implements IFrameReader, IInputChannelMonitor {
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- findNextSender();
- if (lastReadSender >= 0) {
- ByteBuffer srcFrame = channels[lastReadSender].getNextBuffer();
- FrameUtils.copy(srcFrame, buffer);
- channels[lastReadSender].recycleBuffer(srcFrame);
- return true;
- }
- return false;
- }
-
- private void findNextSender() throws HyracksDataException {
- synchronized (NonDeterministicPartitionCollector.this) {
- while (true) {
- switch (lastReadSender) {
- default:
- lastReadSender = frameAvailability.nextSetBit(lastReadSender + 1);
- if (lastReadSender >= 0) {
- break;
- }
- case 0:
- lastReadSender = frameAvailability.nextSetBit(0);
- }
- if (lastReadSender >= 0) {
- assert availableFrameCounts[lastReadSender] > 0;
- if (--availableFrameCounts[lastReadSender] == 0) {
- frameAvailability.clear(lastReadSender);
- }
- return;
- }
- if (!failSenders.isEmpty()) {
- throw new HyracksDataException("Failure occurred on input");
- }
- for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
- channels[i].close();
- eosSenders.clear(i);
- closedSenders.set(i);
- }
- int nextClosedBitIndex = closedSenders.nextClearBit(0);
- if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
- lastReadSender = -1;
- return;
- }
- try {
- NonDeterministicPartitionCollector.this.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- synchronized (NonDeterministicPartitionCollector.this) {
- for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
- .nextClearBit(i + 1)) {
- if (channels[i] != null) {
- channels[i].close();
- channels[i] = null;
- }
- }
- }
- }
-
- @Override
- public void notifyFailure(IInputChannel channel) {
- PartitionId pid = (PartitionId) channel.getAttachment();
- int senderIndex = pid.getSenderIndex();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Failure: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
- }
- synchronized (NonDeterministicPartitionCollector.this) {
- failSenders.set(senderIndex);
- eosSenders.set(senderIndex);
- NonDeterministicPartitionCollector.this.notifyAll();
- }
- }
-
- @Override
- public void notifyDataAvailability(IInputChannel channel, int nFrames) {
- PartitionId pid = (PartitionId) channel.getAttachment();
- int senderIndex = pid.getSenderIndex();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Data available: " + connectorId + " sender: " + senderIndex + " receiver: "
- + receiverIndex);
- }
- synchronized (NonDeterministicPartitionCollector.this) {
- availableFrameCounts[senderIndex] += nFrames;
- frameAvailability.set(senderIndex);
- NonDeterministicPartitionCollector.this.notifyAll();
- }
- }
-
- @Override
- public void notifyEndOfStream(IInputChannel channel) {
- PartitionId pid = (PartitionId) channel.getAttachment();
- int senderIndex = pid.getSenderIndex();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("EOS: " + connectorId + " sender: " + senderIndex + " receiver: " + receiverIndex);
- }
- synchronized (NonDeterministicPartitionCollector.this) {
- eosSenders.set(senderIndex);
- NonDeterministicPartitionCollector.this.notifyAll();
- }
- }
- }
-
- @Override
- public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
- Collection<PartitionId> c = new ArrayList<PartitionId>(expectedPartitions.cardinality());
- for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i + 1)) {
- c.add(new PartitionId(getJobId(), getConnectorId(), i, getReceiverIndex()));
- }
- return c;
- }
-
- @Override
- public void abort() {
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
new file mode 100644
index 0000000..da5b0bc
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionCollector extends AbstractPartitionCollector {
+ private final BitSet expectedPartitions;
+
+ private final IFrameReader frameReader;
+
+ private final IPartitionAcceptor pa;
+
+ public PartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
+ BitSet expectedPartitions, IFrameReader frameReader, IPartitionAcceptor pa) {
+ super(ctx, connectorId, receiverIndex);
+ this.expectedPartitions = expectedPartitions;
+ this.frameReader = frameReader;
+ this.pa = pa;
+ }
+
+ @Override
+ public void open() throws HyracksException {
+ }
+
+ @Override
+ public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+ for (PartitionChannel pc : partitions) {
+ PartitionId pid = pc.getPartitionId();
+ IInputChannel channel = pc.getInputChannel();
+ pa.addPartition(pid, channel);
+ channel.open();
+ }
+ }
+
+ @Override
+ public IFrameReader getReader() throws HyracksException {
+ return frameReader;
+ }
+
+ @Override
+ public void close() throws HyracksException {
+
+ }
+
+ @Override
+ public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+ Collection<PartitionId> c = new ArrayList<PartitionId>(expectedPartitions.cardinality());
+ for (int i = expectedPartitions.nextSetBit(0); i >= 0; i = expectedPartitions.nextSetBit(i + 1)) {
+ c.add(new PartitionId(getJobId(), getConnectorId(), i, getReceiverIndex()));
+ }
+ return c;
+ }
+
+ @Override
+ public void abort() {
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
new file mode 100644
index 0000000..5f41069
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.collectors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+public class SortMergeFrameReader implements IFrameReader {
+ private IHyracksTaskContext ctx;
+ private final int maxConcurrentMerges;
+ private final int nSenders;
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final IPartitionBatchManager pbm;
+
+ private RunMergingFrameReader merger;
+
+ public SortMergeFrameReader(IHyracksTaskContext ctx, int maxConcurrentMerges, int nSenders, int[] sortFields,
+ IBinaryComparator[] comparators, RecordDescriptor recordDescriptor, IPartitionBatchManager pbm) {
+ this.ctx = ctx;
+ this.maxConcurrentMerges = maxConcurrentMerges;
+ this.nSenders = nSenders;
+ this.sortFields = sortFields;
+ this.comparators = comparators;
+ this.recordDescriptor = recordDescriptor;
+ this.pbm = pbm;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (maxConcurrentMerges >= nSenders) {
+ List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
+ for (int i = 0; i < nSenders; ++i) {
+ inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
+ }
+ List<IFrameReader> batch = new ArrayList<IFrameReader>();
+ pbm.getNextBatch(batch, nSenders);
+ merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames, sortFields,
+ comparators, recordDescriptor);
+ } else {
+ // multi level merge.
+ throw new HyracksDataException("Not yet supported");
+ }
+ merger.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ buffer.position(buffer.capacity());
+ buffer.limit(buffer.capacity());
+ return merger.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ merger.close();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
deleted file mode 100644
index 96b2a69..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.collectors;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
-
-public class SortMergePartitionCollector extends AbstractPartitionCollector {
- private final int[] sortFields;
-
- private final IBinaryComparator[] comparators;
-
- private final RecordDescriptor recordDescriptor;
-
- private final int maxConcurrentMerges;
-
- private final IInputChannel[] channels;
-
- private final int nSenders;
-
- private final boolean stable;
-
- private final FrameReader frameReader;
-
- private final PartitionBatchManager pbm;
-
- public SortMergePartitionCollector(IHyracksTaskContext ctx, ConnectorDescriptorId connectorId, int receiverIndex,
- int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDescriptor,
- int maxConcurrentMerges, int nSenders, boolean stable) {
- super(ctx, connectorId, receiverIndex);
- this.sortFields = sortFields;
- this.comparators = comparators;
- this.recordDescriptor = recordDescriptor;
- this.maxConcurrentMerges = maxConcurrentMerges;
- channels = new IInputChannel[nSenders];
- this.nSenders = nSenders;
- this.stable = stable;
- this.frameReader = new FrameReader();
- pbm = new NonDeterministicPartitionBatchManager();
- }
-
- @Override
- public void open() throws HyracksException {
- }
-
- @Override
- public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
- for (PartitionChannel pc : partitions) {
- PartitionId pid = pc.getPartitionId();
- IInputChannel channel = pc.getInputChannel();
- InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
- channel.registerMonitor(channelReader);
- channel.setAttachment(channelReader);
- int senderIndex = pid.getSenderIndex();
- synchronized (this) {
- channels[senderIndex] = channel;
- }
- pbm.addPartition(senderIndex);
- channel.open();
- }
- }
-
- @Override
- public IFrameReader getReader() throws HyracksException {
- return frameReader;
- }
-
- @Override
- public void close() throws HyracksException {
-
- }
-
- @Override
- public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
- Collection<PartitionId> requiredPartitionIds = new ArrayList<PartitionId>();
- for (int i = 0; i < nSenders; ++i) {
- requiredPartitionIds.add(new PartitionId(getJobId(), getConnectorId(), i, receiverIndex));
- }
- return requiredPartitionIds;
- }
-
- @Override
- public void abort() {
-
- }
-
- private abstract class PartitionBatchManager {
- protected abstract void addPartition(int index);
-
- protected abstract void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException;
- }
-
- private class NonDeterministicPartitionBatchManager extends PartitionBatchManager {
- private List<IFrameReader> partitions;
-
- private List<IFrameReader> batch;
-
- private int requiredSize;
-
- public NonDeterministicPartitionBatchManager() {
- partitions = new ArrayList<IFrameReader>();
- }
-
- @Override
- protected void addPartition(int index) {
- synchronized (SortMergePartitionCollector.this) {
- if (batch != null && batch.size() < requiredSize) {
- batch.add((IFrameReader) channels[index].getAttachment());
- if (batch.size() == requiredSize) {
- SortMergePartitionCollector.this.notifyAll();
- }
- } else {
- partitions.add((IFrameReader) channels[index].getAttachment());
- }
- }
- }
-
- @Override
- protected void getNextBatch(List<IFrameReader> batch, int size) throws HyracksDataException {
- synchronized (SortMergePartitionCollector.this) {
- if (partitions.size() <= size) {
- batch.addAll(partitions);
- partitions.clear();
- } else if (partitions.size() > size) {
- List<IFrameReader> sublist = partitions.subList(0, size);
- batch.addAll(sublist);
- sublist.clear();
- }
- if (batch.size() == size) {
- return;
- }
- this.batch = batch;
- this.requiredSize = size;
- while (batch.size() < size) {
- try {
- SortMergePartitionCollector.this.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- this.batch = null;
- }
- }
- }
-
- private static class InputChannelFrameReader implements IFrameReader, IInputChannelMonitor {
- private final IInputChannel channel;
-
- private int availableFrames;
-
- private boolean eos;
-
- private boolean failed;
-
- public InputChannelFrameReader(IInputChannel channel) {
- this.channel = channel;
- availableFrames = 0;
- eos = false;
- failed = false;
- }
-
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- synchronized (this) {
- while (!failed && !eos && availableFrames <= 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- if (failed) {
- throw new HyracksDataException("Failure occurred on input");
- }
- if (availableFrames <= 0 && eos) {
- return false;
- }
- --availableFrames;
- }
- ByteBuffer srcBuffer = channel.getNextBuffer();
- FrameUtils.copy(srcBuffer, buffer);
- channel.recycleBuffer(srcBuffer);
- return true;
- }
-
- @Override
- public void close() throws HyracksDataException {
-
- }
-
- @Override
- public synchronized void notifyFailure(IInputChannel channel) {
- failed = true;
- notifyAll();
- }
-
- @Override
- public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
- availableFrames += nFrames;
- notifyAll();
- }
-
- @Override
- public synchronized void notifyEndOfStream(IInputChannel channel) {
- eos = true;
- notifyAll();
- }
- }
-
- private class FrameReader implements IFrameReader {
- private RunMergingFrameReader merger;
-
- @Override
- public void open() throws HyracksDataException {
- if (maxConcurrentMerges >= nSenders) {
- List<ByteBuffer> inFrames = new ArrayList<ByteBuffer>();
- for (int i = 0; i < nSenders; ++i) {
- inFrames.add(ByteBuffer.allocate(ctx.getFrameSize()));
- }
- List<IFrameReader> batch = new ArrayList<IFrameReader>();
- pbm.getNextBatch(batch, nSenders);
- merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames,
- sortFields, comparators, recordDescriptor);
- } else {
- // multi level merge.
- throw new HyracksDataException("Not yet supported");
- }
- merger.open();
- }
-
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- buffer.position(buffer.capacity());
- buffer.limit(buffer.capacity());
- return merger.nextFrame(buffer);
- }
-
- @Override
- public void close() throws HyracksDataException {
- merger.close();
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 7d92150..0e1e413 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -25,7 +25,9 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
@@ -75,8 +77,11 @@
if (localityMap.isConnected(i, receiverIndex, nConsumerPartitions))
expectedPartitions.set(i);
}
- return new NonDeterministicPartitionCollector(ctx, getConnectorId(), receiverIndex, nProducerPartitions,
+ NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
expectedPartitions);
+ NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+ return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
+ channelReader);
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 12935af..291e149 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -25,7 +25,9 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -50,7 +52,9 @@
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(0, nProducerPartitions);
- return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+ NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
expectedPartitions);
+ NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+ return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 54979f0..2576423 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -14,6 +14,9 @@
*/
package edu.uci.ics.hyracks.dataflow.std.connectors;
+import java.util.BitSet;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
@@ -25,7 +28,10 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.SortMergePartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.IPartitionBatchManager;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.SortMergeFrameReader;
public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -65,7 +71,11 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- return new SortMergePartitionCollector(ctx, getConnectorId(), index, sortFields, comparators, recordDesc,
- nProducerPartitions, nProducerPartitions, stable);
+ IPartitionBatchManager pbm = new NonDeterministicPartitionBatchManager(nProducerPartitions);
+ IFrameReader sortMergeFrameReader = new SortMergeFrameReader(ctx, nProducerPartitions, nProducerPartitions,
+ sortFields, comparators, recordDesc, pbm);
+ BitSet expectedPartitions = new BitSet();
+ expectedPartitions.set(0, nProducerPartitions);
+ return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sortMergeFrameReader, pbm);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index ba66ebc..fd67f77 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -25,7 +25,9 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
public MToNReplicatingConnectorDescriptor(JobSpecification spec) {
@@ -82,7 +84,9 @@
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(0, nProducerPartitions);
- return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+ NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
expectedPartitions);
+ NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+ return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 7c227e2..e8d9751 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -30,7 +30,9 @@
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicPartitionCollector;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.collectors.PartitionCollector;
public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -51,8 +53,10 @@
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(index);
- return new NonDeterministicPartitionCollector(ctx, getConnectorId(), index, nProducerPartitions,
+ NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
expectedPartitions);
+ NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+ return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
}
@Override