Fixed the code format based on the Hyracks code format profile.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_spilling_groupby@309 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
index e036470..a99cba9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
@@ -50,44 +50,48 @@
};
}
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
- private int count;
+ @Override
+ public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+ return new ISpillableFieldValueResultingAggregator() {
+ private int count;
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeInt(count);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
- throws HyracksDataException {
- count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2 + accessor.getFieldStartOffset(tIndex, fIndex));
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ count = IntegerSerializerDeserializer.getInt(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
- }
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- count++;
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ count++;
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- count = 0;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ count = 0;
+ }
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2 + accessor.getFieldStartOffset(tIndex, fIndex));
- }
- };
- }
+ @Override
+ public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ count += IntegerSerializerDeserializer.getInt(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
index 0b02378..7e6bc8c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
@@ -25,123 +25,111 @@
* SUM aggregator on float type data.
*
* @author jarodwen
- *
*/
-public class FloatSumAggregatorFactory implements
- IFieldValueResultingAggregatorFactory {
+public class FloatSumAggregatorFactory implements IFieldValueResultingAggregatorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
- private int sumField;
+ private static final long serialVersionUID = 1L;
+ private int sumField;
- public FloatSumAggregatorFactory(int field) {
- this.sumField = field;
- }
+ public FloatSumAggregatorFactory(int field) {
+ this.sumField = field;
+ }
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
- * IFieldValueResultingAggregatorFactory
- * #createFieldValueResultingAggregator()
- */
- @Override
- public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
- return new IFieldValueResultingAggregator() {
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
+ * IFieldValueResultingAggregatorFactory
+ * #createFieldValueResultingAggregator()
+ */
+ @Override
+ public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
+ return new IFieldValueResultingAggregator() {
- private float sum;
+ private float sum;
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(sum);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeFloat(sum);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum = 0;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ sum = 0;
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- }
- };
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+ sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart);
+ }
+ };
+ }
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
- * IFieldValueResultingAggregatorFactory
- * #createSpillableFieldValueResultingAggregator()
- */
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
+ * IFieldValueResultingAggregatorFactory
+ * #createSpillableFieldValueResultingAggregator()
+ */
+ @Override
+ public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+ return new ISpillableFieldValueResultingAggregator() {
- private float sum;
+ private float sum;
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(sum);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeFloat(sum);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+ }
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum = FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldCount() * 2
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ sum = FloatSerializerDeserializer.getFloat(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum = 0;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ sum = 0;
+ }
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum += FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldCount() * 2
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
+ @Override
+ public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ sum += FloatSerializerDeserializer.getFloat(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- }
- };
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+ sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart);
+ }
+ };
+ }
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
index b3ce8dd..1b7da7f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
@@ -18,6 +18,6 @@
public interface IFieldValueResultingAggregatorFactory extends Serializable {
public IFieldValueResultingAggregator createFieldValueResultingAggregator();
-
+
public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator();
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
index 95d4ac9..d52f458 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
@@ -20,32 +20,29 @@
/**
* An extended version of the {@link IFieldValueResultingAggregator} supporting
* external aggregation.
- *
*/
-public interface ISpillableFieldValueResultingAggregator extends
- IFieldValueResultingAggregator {
+public interface ISpillableFieldValueResultingAggregator extends IFieldValueResultingAggregator {
- /**
- * Called once per aggregator before calling accumulate for the first time.
- *
- * @param accessor
- * - Accessor to the data tuple.
- * @param tIndex
- * - Index of the tuple in the accessor.
- * @throws HyracksDataException
- */
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
- int fIndex) throws HyracksDataException;
+ /**
+ * Called once per aggregator before calling accumulate for the first time.
+ *
+ * @param accessor
+ * - Accessor to the data tuple.
+ * @param tIndex
+ * - Index of the tuple in the accessor.
+ * @throws HyracksDataException
+ */
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex) throws HyracksDataException;
- /**
- * Aggregate another partial result.
- *
- * @param accessor
- * @param tIndex
- * @param fIndex
- * @throws HyracksDataException
- */
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException;
+ /**
+ * Aggregate another partial result.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param fIndex
+ * @throws HyracksDataException
+ */
+ public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
index 95740b2..0c3b0d5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
@@ -23,127 +23,115 @@
/**
* Min/Max aggregator factory
- *
*/
-public class MinMaxAggregatorFactory implements
- IFieldValueResultingAggregatorFactory {
+public class MinMaxAggregatorFactory implements IFieldValueResultingAggregatorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- /**
- * indicate the type of the value: true: max false: min
- */
- private boolean type;
+ /**
+ * indicate the type of the value: true: max false: min
+ */
+ private boolean type;
- /**
- * The field to be aggregated.
- */
- private int field;
+ /**
+ * The field to be aggregated.
+ */
+ private int field;
- public MinMaxAggregatorFactory(boolean type, int field) {
- this.type = type;
- this.field = field;
- }
+ public MinMaxAggregatorFactory(boolean type, int field) {
+ this.type = type;
+ this.field = field;
+ }
- @Override
- public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
- return new IFieldValueResultingAggregator() {
+ @Override
+ public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
+ return new IFieldValueResultingAggregator() {
- private float minmax;
+ private float minmax;
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(minmax);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeFloat(minmax);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- minmax = 0;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ minmax = 0;
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, field);
- float nval = FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- if ((type ? (nval > minmax) : (nval < minmax))) {
- minmax = nval;
- }
- }
- };
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, field);
+ float nval = FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2
+ * fieldCount + fieldStart);
+ if ((type ? (nval > minmax) : (nval < minmax))) {
+ minmax = nval;
+ }
+ }
+ };
+ }
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
+ @Override
+ public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+ return new ISpillableFieldValueResultingAggregator() {
- private float minmax;
+ private float minmax;
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeFloat(minmax);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeFloat(minmax);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+ }
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- minmax = FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldCount() * 2
- + accessor.getFieldStartOffset(tIndex, fIndex));
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ minmax = FloatSerializerDeserializer.getFloat(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
- }
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- minmax = 0;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ minmax = 0;
+ }
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- minmax = FloatSerializerDeserializer.getFloat(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldCount() * 2
- + accessor.getFieldStartOffset(tIndex, fIndex));
+ @Override
+ public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ minmax = FloatSerializerDeserializer.getFloat(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
- }
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, field);
- float nval = FloatSerializerDeserializer.getFloat(accessor
- .getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- if ((type ? (nval > minmax) : (nval < minmax))) {
- minmax = nval;
- }
- }
- };
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, field);
+ float nval = FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(), tupleOffset + 2
+ * fieldCount + fieldStart);
+ if ((type ? (nval > minmax) : (nval < minmax))) {
+ minmax = nval;
+ }
+ }
+ };
+ }
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
index b237d63..957c453 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
@@ -27,170 +27,149 @@
import edu.uci.ics.hyracks.dataflow.std.group.ISpillableAccumulatingAggregator;
public class MultiAggregatorFactory implements IAccumulatingAggregatorFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private IFieldValueResultingAggregatorFactory[] aFactories;
+ private IFieldValueResultingAggregatorFactory[] aFactories;
- public MultiAggregatorFactory(
- IFieldValueResultingAggregatorFactory[] aFactories) {
- this.aFactories = aFactories;
- }
+ public MultiAggregatorFactory(IFieldValueResultingAggregatorFactory[] aFactories) {
+ this.aFactories = aFactories;
+ }
- @Override
- public ISpillableAccumulatingAggregator createSpillableAggregator(
- IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
- final RecordDescriptor outRecordDescriptor) {
- final ISpillableFieldValueResultingAggregator aggregators[] = new ISpillableFieldValueResultingAggregator[aFactories.length];
- for (int i = 0; i < aFactories.length; ++i) {
- aggregators[i] = aFactories[i]
- .createSpillableFieldValueResultingAggregator();
- }
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
- return new ISpillableAccumulatingAggregator() {
- private boolean pending;
+ @Override
+ public ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
+ RecordDescriptor inRecordDesc, final RecordDescriptor outRecordDescriptor) {
+ final ISpillableFieldValueResultingAggregator aggregators[] = new ISpillableFieldValueResultingAggregator[aFactories.length];
+ for (int i = 0; i < aFactories.length; ++i) {
+ aggregators[i] = aFactories[i].createSpillableFieldValueResultingAggregator();
+ }
+ final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ return new ISpillableAccumulatingAggregator() {
+ private boolean pending;
- @Override
- public boolean output(FrameTupleAppender appender,
- IFrameTupleAccessor accessor, int tIndex,
- int[] keyFieldIndexes) throws HyracksDataException {
- if (!pending) {
- tb.reset();
- for (int i = 0; i < keyFieldIndexes.length; ++i) {
- tb.addField(accessor, tIndex, keyFieldIndexes[i]);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].output(dos);
- tb.addFieldEndOffset();
- }
- }
- if (!appender.append(tb.getFieldEndOffsets(),
- tb.getByteArray(), 0, tb.getSize())) {
- pending = true;
- return false;
- }
- return true;
- }
+ @Override
+ public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
+ int[] keyFieldIndexes) throws HyracksDataException {
+ if (!pending) {
+ tb.reset();
+ for (int i = 0; i < keyFieldIndexes.length; ++i) {
+ tb.addField(accessor, tIndex, keyFieldIndexes[i]);
+ }
+ DataOutput dos = tb.getDataOutput();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].output(dos);
+ tb.addFieldEndOffset();
+ }
+ }
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ pending = true;
+ return false;
+ }
+ return true;
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].init(accessor, tIndex);
- }
- pending = false;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ tb.reset();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].init(accessor, tIndex);
+ }
+ pending = false;
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].accumulate(accessor, tIndex);
- }
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].accumulate(accessor, tIndex);
+ }
+ }
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, int[] keyFieldIndexes)
- throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].initFromPartial(accessor, tIndex,
- keyFieldIndexes.length + i);
- }
- pending = false;
- }
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+ throws HyracksDataException {
+ tb.reset();
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].initFromPartial(accessor, tIndex, keyFieldIndexes.length + i);
+ }
+ pending = false;
+ }
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int[] keyFieldIndexes)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].accumulatePartialResult(accessor, tIndex,
- keyFieldIndexes.length + i);
- }
+ @Override
+ public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+ throws HyracksDataException {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].accumulatePartialResult(accessor, tIndex, keyFieldIndexes.length + i);
+ }
- }
+ }
- @Override
- public boolean output(FrameTupleAppender appender,
- ArrayTupleBuilder tbder) throws HyracksDataException {
- if (!pending) {
- // TODO Here to be fixed:
- DataOutput dos = tbder.getDataOutput();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].output(dos);
- tbder.addFieldEndOffset();
- }
- }
- if (!appender.append(tbder.getFieldEndOffsets(),
- tbder.getByteArray(), 0, tbder.getSize())) {
- pending = true;
- return false;
- }
- return true;
- }
- };
- }
+ @Override
+ public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException {
+ if (!pending) {
+ // TODO Here to be fixed:
+ DataOutput dos = tbder.getDataOutput();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].output(dos);
+ tbder.addFieldEndOffset();
+ }
+ }
+ if (!appender.append(tbder.getFieldEndOffsets(), tbder.getByteArray(), 0, tbder.getSize())) {
+ pending = true;
+ return false;
+ }
+ return true;
+ }
+ };
+ }
- @Override
- public IAccumulatingAggregator createAggregator(
- IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor) throws HyracksDataException {
- final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
- for (int i = 0; i < aFactories.length; ++i) {
- aggregators[i] = aFactories[i]
- .createFieldValueResultingAggregator();
- }
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
- return new IAccumulatingAggregator() {
+ @Override
+ public IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+ final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
+ for (int i = 0; i < aFactories.length; ++i) {
+ aggregators[i] = aFactories[i].createFieldValueResultingAggregator();
+ }
+ final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ return new IAccumulatingAggregator() {
- private boolean pending;
+ private boolean pending;
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- tb.reset();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].init(accessor, tIndex);
- }
- pending = false;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ tb.reset();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].init(accessor, tIndex);
+ }
+ pending = false;
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].accumulate(accessor, tIndex);
- }
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].accumulate(accessor, tIndex);
+ }
+ }
- @Override
- public boolean output(FrameTupleAppender appender,
- IFrameTupleAccessor accessor, int tIndex,
- int[] keyFieldIndexes) throws HyracksDataException {
- if (!pending) {
- tb.reset();
- for (int i = 0; i < keyFieldIndexes.length; ++i) {
- tb.addField(accessor, tIndex, keyFieldIndexes[i]);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].output(dos);
- tb.addFieldEndOffset();
- }
- }
- if (!appender.append(tb.getFieldEndOffsets(),
- tb.getByteArray(), 0, tb.getSize())) {
- pending = true;
- return false;
- }
- return true;
- }
+ @Override
+ public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
+ int[] keyFieldIndexes) throws HyracksDataException {
+ if (!pending) {
+ tb.reset();
+ for (int i = 0; i < keyFieldIndexes.length; ++i) {
+ tb.addField(accessor, tIndex, keyFieldIndexes[i]);
+ }
+ DataOutput dos = tb.getDataOutput();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].output(dos);
+ tb.addFieldEndOffset();
+ }
+ }
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ pending = true;
+ return false;
+ }
+ return true;
+ }
- };
- }
+ };
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
index cfb5293..64335f1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
@@ -24,123 +24,111 @@
/**
* SUM aggregator factory (for integer only; another SUM aggregator for floats
* is available at {@link FloatSumAggregatorFactory})
- *
*/
-public class SumAggregatorFactory implements
- IFieldValueResultingAggregatorFactory {
+public class SumAggregatorFactory implements IFieldValueResultingAggregatorFactory {
- private int sumField;
+ private int sumField;
- public SumAggregatorFactory(int field) {
- sumField = field;
- }
+ public SumAggregatorFactory(int field) {
+ sumField = field;
+ }
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
- * IFieldValueResultingAggregatorFactory
- * #createFieldValueResultingAggregator()
- */
- @Override
- public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
- return new IFieldValueResultingAggregator() {
- private int sum;
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
+ * IFieldValueResultingAggregatorFactory
+ * #createFieldValueResultingAggregator()
+ */
+ @Override
+ public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
+ return new IFieldValueResultingAggregator() {
+ private int sum;
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeInt(sum);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeInt(sum);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum++;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ sum++;
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- }
- };
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart);
+ }
+ };
+ }
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregators.spillable.
- * ISpillableFieldValueResultingAggregatorFactory
- * #createFieldValueResultingAggregator()
- */
- @Override
- public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
- return new ISpillableFieldValueResultingAggregator() {
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregators.spillable.
+ * ISpillableFieldValueResultingAggregatorFactory
+ * #createFieldValueResultingAggregator()
+ */
+ @Override
+ public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
+ return new ISpillableFieldValueResultingAggregator() {
- private int sum;
+ private int sum;
- @Override
- public void output(DataOutput resultAcceptor)
- throws HyracksDataException {
- try {
- resultAcceptor.writeInt(sum);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeInt(sum);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+ }
- @Override
- public void initFromPartial(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum = IntegerSerializerDeserializer.getInt(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldCount() * 2
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ sum = IntegerSerializerDeserializer.getInt(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
+ }
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- sum = 0;
- }
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ sum = 0;
+ }
- @Override
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int fIndex) throws HyracksDataException {
- sum += IntegerSerializerDeserializer.getInt(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex)
- + accessor.getFieldCount() * 2
- + accessor.getFieldStartOffset(tIndex, fIndex));
- }
+ @Override
+ public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
+ throws HyracksDataException {
+ sum += IntegerSerializerDeserializer.getInt(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ + accessor.getFieldStartOffset(tIndex, fIndex));
+ }
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldCount = accessor.getFieldCount();
- int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(), tupleOffset + 2 * fieldCount
- + fieldStart);
- }
- };
- }
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldCount = accessor.getFieldCount();
+ int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
+ sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+ + fieldStart);
+ }
+ };
+ }
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 1bad6a7..530207d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -33,111 +33,104 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
/**
- * File writer to output plain text.
- *
+ * File writer to output plain text.
*/
-public class PlainFileWriterOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
+public class PlainFileWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- private IFileSplitProvider fileSplitProvider;
-
- private String delim;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
- /**
- * @param spec
- * @param inputArity
- * @param outputArity
- */
- public PlainFileWriterOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider, String delim) {
- super(spec, 1, 0);
- this.fileSplitProvider = fileSplitProvider;
- this.delim = delim;
- }
+ private IFileSplitProvider fileSplitProvider;
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksContext, edu.uci.ics.hyracks.api.job.IOperatorEnvironment, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
- */
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx,
- IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, final int partition,
- int nPartitions) throws HyracksDataException {
- // Output files
- final FileSplit[] splits = fileSplitProvider.getFileSplits();
- // Frame accessor
- final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
- // Record descriptor
- final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- private BufferedWriter out;
-
- private ByteBufferInputStream bbis;
-
- private DataInputStream di;
-
- @Override
- public void open() throws HyracksDataException {
- try {
+ private String delim;
+
+ /**
+ * @param spec
+ * @param inputArity
+ * @param outputArity
+ */
+ public PlainFileWriterOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider, String delim) {
+ super(spec, 1, 0);
+ this.fileSplitProvider = fileSplitProvider;
+ this.delim = delim;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksContext, edu.uci.ics.hyracks.api.job.IOperatorEnvironment, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
+ */
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ // Output files
+ final FileSplit[] splits = fileSplitProvider.getFileSplits();
+ // Frame accessor
+ final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ // Record descriptor
+ final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private BufferedWriter out;
+
+ private ByteBufferInputStream bbis;
+
+ private DataInputStream di;
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
out = new BufferedWriter(new FileWriter(splits[partition].getLocalFile().getFile()));
bbis = new ByteBufferInputStream();
di = new DataInputStream(bbis);
} catch (Exception e) {
throw new HyracksDataException(e);
}
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- try {
- frameTupleAccessor.reset(buffer);
- for (int tIndex = 0; tIndex < frameTupleAccessor
- .getTupleCount(); tIndex++) {
- int start = frameTupleAccessor
- .getTupleStartOffset(tIndex)
- + frameTupleAccessor.getFieldSlotsLength();
- bbis.setByteBuffer(buffer, start);
- Object[] record = new Object[recordDescriptor
- .getFields().length];
- for (int i = 0; i < record.length; ++i) {
- Object instance = recordDescriptor.getFields()[i]
- .deserialize(di);
- if (i == 0) {
- out.write(String.valueOf(instance));
- } else {
- out.write(delim + String.valueOf(instance));
- }
- }
- out.write("\n");
- }
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- try {
- out.flush();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- out.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- };
- }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ frameTupleAccessor.reset(buffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+ + frameTupleAccessor.getFieldSlotsLength();
+ bbis.setByteBuffer(buffer, start);
+ Object[] record = new Object[recordDescriptor.getFields().length];
+ for (int i = 0; i < record.length; ++i) {
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (i == 0) {
+ out.write(String.valueOf(instance));
+ } else {
+ out.write(delim + String.valueOf(instance));
+ }
+ }
+ out.write("\n");
+ }
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ try {
+ out.flush();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ out.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
index af78b7d..b13b087 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
@@ -52,717 +52,641 @@
/**
* This is an implementation of the external hash group operator.
- *
* The motivation of this operator is that when tuples are processed in
* parallel, distinguished aggregating keys partitioned on one node may exceed
* the main memory, so aggregation results should be output onto the disk to
* make space for aggregating more input tuples.
- *
- *
*/
-public class ExternalHashGroupOperatorDescriptor extends
- AbstractOperatorDescriptor {
+public class ExternalHashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
- /**
- * The input frame identifier (in the job environment)
- */
- private static final String GROUPTABLES = "gtables";
+ /**
+ * The input frame identifier (in the job environment)
+ */
+ private static final String GROUPTABLES = "gtables";
- /**
- * The runs files identifier (in the job environment)
- */
- private static final String RUNS = "runs";
+ /**
+ * The runs files identifier (in the job environment)
+ */
+ private static final String RUNS = "runs";
- /**
- * The fields used for grouping (grouping keys).
- */
- private final int[] keyFields;
+ /**
+ * The fields used for grouping (grouping keys).
+ */
+ private final int[] keyFields;
- /**
- * The comparator for checking the grouping conditions, corresponding to the
- * {@link #keyFields}.
- */
- private final IBinaryComparatorFactory[] comparatorFactories;
+ /**
+ * The comparator for checking the grouping conditions, corresponding to the {@link #keyFields}.
+ */
+ private final IBinaryComparatorFactory[] comparatorFactories;
- /**
- * The aggregator factory for the aggregating field, corresponding to the
- * {@link #aggregateFields}.
- */
- private IAccumulatingAggregatorFactory aggregatorFactory;
+ /**
+ * The aggregator factory for the aggregating field, corresponding to the {@link #aggregateFields}.
+ */
+ private IAccumulatingAggregatorFactory aggregatorFactory;
- /**
- * The maximum number of frames in the main memory.
- */
- private final int framesLimit;
+ /**
+ * The maximum number of frames in the main memory.
+ */
+ private final int framesLimit;
- /**
- * Indicate whether the final output will be sorted or not.
- */
- private final boolean sortOutput;
+ /**
+ * Indicate whether the final output will be sorted or not.
+ */
+ private final boolean sortOutput;
- /**
- * Partition computer factory
- */
- private final ITuplePartitionComputerFactory tpcf;
+ /**
+ * Partition computer factory
+ */
+ private final ITuplePartitionComputerFactory tpcf;
- /**
- * The size of the in-memory table, which should be specified now by the
- * creator of this operator descriptor.
- */
- private final int tableSize;
+ /**
+ * The size of the in-memory table, which should be specified now by the
+ * creator of this operator descriptor.
+ */
+ private final int tableSize;
- /**
- * XXX Logger for debug information
- */
- private static Logger LOGGER = Logger
- .getLogger(ExternalHashGroupOperatorDescriptor.class.getName());
+ /**
+ * XXX Logger for debug information
+ */
+ private static Logger LOGGER = Logger.getLogger(ExternalHashGroupOperatorDescriptor.class.getName());
- /**
- * Constructor of the external hash group operator descriptor.
- *
- * @param spec
- * @param keyFields
- * The fields as keys of grouping.
- * @param framesLimit
- * The maximum number of frames to be used in memory.
- * @param sortOutput
- * Whether the output should be sorted or not. Note that if the
- * input data is large enough for external grouping, the output
- * will be sorted surely. The only case that when the output is
- * not sorted is when the size of the input data can be grouped
- * in memory and this parameter is false.
- * @param tpcf
- * The partitioner.
- * @param comparatorFactories
- * The comparators.
- * @param aggregatorFactory
- * The aggregators.
- * @param recordDescriptor
- * The record descriptor for the input data.
- * @param tableSize
- * The maximum size of the in memory table usable to this
- * operator.
- */
- public ExternalHashGroupOperatorDescriptor(JobSpecification spec,
- int[] keyFields, int framesLimit, boolean sortOutput,
- ITuplePartitionComputerFactory tpcf,
- IBinaryComparatorFactory[] comparatorFactories,
- IAccumulatingAggregatorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor, int tableSize) {
- super(spec, 1, 1);
- this.framesLimit = framesLimit;
- if (framesLimit <= 1) {
- // Minimum of 2 frames: 1 for input records, and 1 for output
- // aggregation results.
- throw new IllegalStateException();
- }
- this.aggregatorFactory = aggregatorFactory;
- this.keyFields = keyFields;
- this.comparatorFactories = comparatorFactories;
+ /**
+ * Constructor of the external hash group operator descriptor.
+ *
+ * @param spec
+ * @param keyFields
+ * The fields as keys of grouping.
+ * @param framesLimit
+ * The maximum number of frames to be used in memory.
+ * @param sortOutput
+ * Whether the output should be sorted or not. Note that if the
+ * input data is large enough for external grouping, the output
+ * will be sorted surely. The only case that when the output is
+ * not sorted is when the size of the input data can be grouped
+ * in memory and this parameter is false.
+ * @param tpcf
+ * The partitioner.
+ * @param comparatorFactories
+ * The comparators.
+ * @param aggregatorFactory
+ * The aggregators.
+ * @param recordDescriptor
+ * The record descriptor for the input data.
+ * @param tableSize
+ * The maximum size of the in memory table usable to this
+ * operator.
+ */
+ public ExternalHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ boolean sortOutput, ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
+ IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor recordDescriptor, int tableSize) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ if (framesLimit <= 1) {
+ // Minimum of 2 frames: 1 for input records, and 1 for output
+ // aggregation results.
+ throw new IllegalStateException();
+ }
+ this.aggregatorFactory = aggregatorFactory;
+ this.keyFields = keyFields;
+ this.comparatorFactories = comparatorFactories;
- this.sortOutput = sortOutput;
+ this.sortOutput = sortOutput;
- this.tpcf = tpcf;
+ this.tpcf = tpcf;
- this.tableSize = tableSize;
+ this.tableSize = tableSize;
- // Set the record descriptor. Note that since this operator is a unary
- // operator,
- // only the first record descritpor is used here.
- recordDescriptors[0] = recordDescriptor;
- }
+ // Set the record descriptor. Note that since this operator is a unary
+ // operator,
+ // only the first record descritpor is used here.
+ recordDescriptors[0] = recordDescriptor;
+ }
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
- /*
- * (non-Javadoc)
- *
- * @see
- * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeTaskGraph
- * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
- */
- @Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
- MergeActivity mergeAct = new MergeActivity();
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeTaskGraph
+ * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+ */
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
+ MergeActivity mergeAct = new MergeActivity();
- builder.addTask(partialAggAct);
- builder.addSourceEdge(0, partialAggAct, 0);
+ builder.addTask(partialAggAct);
+ builder.addSourceEdge(0, partialAggAct, 0);
- builder.addTask(mergeAct);
- builder.addTargetEdge(0, mergeAct, 0);
+ builder.addTask(mergeAct);
+ builder.addTargetEdge(0, mergeAct, 0);
- // FIXME Block or not?
- builder.addBlockingEdge(partialAggAct, mergeAct);
+ // FIXME Block or not?
+ builder.addBlockingEdge(partialAggAct, mergeAct);
- }
+ }
- private class PartialAggregateActivity extends AbstractActivityNode {
+ private class PartialAggregateActivity extends AbstractActivityNode {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksStageletContext ctx,
- final IOperatorEnvironment env,
- final IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) {
- // Create the in-memory hash table
- final SpillableGroupingHashTable gTable = new SpillableGroupingHashTable(
- ctx, keyFields, comparatorFactories, tpcf,
- aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0), recordDescriptors[0],
- // Always take one frame for the input records
- framesLimit - 1, tableSize);
- // Create the tuple accessor
- final FrameTupleAccessor accessor = new FrameTupleAccessor(
- ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
- // Create the partial aggregate activity node
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ // Create the in-memory hash table
+ final SpillableGroupingHashTable gTable = new SpillableGroupingHashTable(ctx, keyFields,
+ comparatorFactories, tpcf, aggregatorFactory, recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0), recordDescriptors[0],
+ // Always take one frame for the input records
+ framesLimit - 1, tableSize);
+ // Create the tuple accessor
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ // Create the partial aggregate activity node
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- /**
- * Run files
- */
- private LinkedList<RunFileReader> runs;
+ /**
+ * Run files
+ */
+ private LinkedList<RunFileReader> runs;
- @Override
- public void close() throws HyracksDataException {
- if (gTable.getFrameCount() >= 0) {
- if (runs.size() <= 0) {
- // All in memory
- env.set(GROUPTABLES, gTable);
- } else {
- // flush the memory into the run file.
- flushFramesToRun();
- }
- }
- env.set(RUNS, runs);
- }
+ @Override
+ public void close() throws HyracksDataException {
+ if (gTable.getFrameCount() >= 0) {
+ if (runs.size() <= 0) {
+ // All in memory
+ env.set(GROUPTABLES, gTable);
+ } else {
+ // flush the memory into the run file.
+ flushFramesToRun();
+ }
+ }
+ env.set(RUNS, runs);
+ }
- @Override
- public void flush() throws HyracksDataException {
+ @Override
+ public void flush() throws HyracksDataException {
- }
+ }
- /**
- * Process the next input buffer.
- *
- * The actual insertion is processed in {@link #gTable}. It will
- * check whether it is possible to contain the data into the
- * main memory or not. If not, it will indicate the operator to
- * flush the content of the table into a run file.
- */
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- // If the group table is too large, flush the table into
- // a run file.
- if (!gTable.insert(accessor, i)) {
- flushFramesToRun();
- if (!gTable.insert(accessor, i))
- throw new HyracksDataException(
- "Failed to insert a new buffer into the aggregate operator!");
- }
- }
+ /**
+ * Process the next input buffer.
+ * The actual insertion is processed in {@link #gTable}. It will
+ * check whether it is possible to contain the data into the
+ * main memory or not. If not, it will indicate the operator to
+ * flush the content of the table into a run file.
+ */
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ // If the group table is too large, flush the table into
+ // a run file.
+ if (!gTable.insert(accessor, i)) {
+ flushFramesToRun();
+ if (!gTable.insert(accessor, i))
+ throw new HyracksDataException(
+ "Failed to insert a new buffer into the aggregate operator!");
+ }
+ }
- }
+ }
- @Override
- public void open() throws HyracksDataException {
- runs = new LinkedList<RunFileReader>();
- gTable.reset();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ runs = new LinkedList<RunFileReader>();
+ gTable.reset();
+ }
- /**
- * Flush the content of the group table into a run file.
- *
- * During the flushing, the hash table will be sorted as first.
- * After that, a run file handler is initialized and the hash
- * table is flushed into the run file.
- *
- * @throws HyracksDataException
- */
- private void flushFramesToRun() throws HyracksDataException {
- // Sort the contents of the hash table.
- gTable.sortFrames();
- FileReference runFile;
- try {
- runFile = ctx.getJobletContext().createWorkspaceFile(
- ExternalHashGroupOperatorDescriptor.class
- .getSimpleName());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- RunFileWriter writer = new RunFileWriter(runFile,
- ctx.getIOManager());
- writer.open();
- try {
- gTable.flushFrames(writer, true);
- } catch (Exception ex) {
- throw new HyracksDataException(ex);
- } finally {
- writer.close();
- }
- gTable.reset();
- runs.add(((RunFileWriter) writer).createReader());
- LOGGER.warning("Created run file: "
- + runFile.getFile().getAbsolutePath());
- }
+ /**
+ * Flush the content of the group table into a run file.
+ * During the flushing, the hash table will be sorted as first.
+ * After that, a run file handler is initialized and the hash
+ * table is flushed into the run file.
+ *
+ * @throws HyracksDataException
+ */
+ private void flushFramesToRun() throws HyracksDataException {
+ // Sort the contents of the hash table.
+ gTable.sortFrames();
+ FileReference runFile;
+ try {
+ runFile = ctx.getJobletContext().createWorkspaceFile(
+ ExternalHashGroupOperatorDescriptor.class.getSimpleName());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+ writer.open();
+ try {
+ gTable.flushFrames(writer, true);
+ } catch (Exception ex) {
+ throw new HyracksDataException(ex);
+ } finally {
+ writer.close();
+ }
+ gTable.reset();
+ runs.add(((RunFileWriter) writer).createReader());
+ LOGGER.warning("Created run file: " + runFile.getFile().getAbsolutePath());
+ }
- };
+ };
- return op;
- }
+ return op;
+ }
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalHashGroupOperatorDescriptor.this;
- }
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return ExternalHashGroupOperatorDescriptor.this;
+ }
- }
+ }
- private class MergeActivity extends AbstractActivityNode {
+ private class MergeActivity extends AbstractActivityNode {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksStageletContext ctx,
- final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i]
- .createBinaryComparator();
- }
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- /**
- * Input frames, one for each run file.
- */
- private List<ByteBuffer> inFrames;
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ /**
+ * Input frames, one for each run file.
+ */
+ private List<ByteBuffer> inFrames;
- /**
- * Output frame.
- */
- private ByteBuffer outFrame;
+ /**
+ * Output frame.
+ */
+ private ByteBuffer outFrame;
- /**
- * List of the run files to be merged
- */
- LinkedList<RunFileReader> runs;
+ /**
+ * List of the run files to be merged
+ */
+ LinkedList<RunFileReader> runs;
- /**
- * Tuple appender for the output frame {@link #outFrame}.
- */
- private FrameTupleAppender outFrameAppender;
+ /**
+ * Tuple appender for the output frame {@link #outFrame}.
+ */
+ private FrameTupleAppender outFrameAppender;
- private ISpillableAccumulatingAggregator visitingAggregator;
- private ArrayTupleBuilder visitingKeyTuple;
+ private ISpillableAccumulatingAggregator visitingAggregator;
+ private ArrayTupleBuilder visitingKeyTuple;
- @SuppressWarnings("unchecked")
- @Override
- public void initialize() throws HyracksDataException {
- runs = (LinkedList<RunFileReader>) env.get(RUNS);
- writer.open();
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize() throws HyracksDataException {
+ runs = (LinkedList<RunFileReader>) env.get(RUNS);
+ writer.open();
- try {
- if (runs.size() <= 0) {
- // If the aggregate results can be fit into
- // memory...
- SpillableGroupingHashTable gTable = (SpillableGroupingHashTable) env
- .get(GROUPTABLES);
- if (gTable != null) {
- gTable.flushFrames(writer, sortOutput);
- }
- env.set(GROUPTABLES, null);
- } else {
- // Otherwise, merge the run files into a single file
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.allocateFrame();
- outFrameAppender = new FrameTupleAppender(
- ctx.getFrameSize());
- outFrameAppender.reset(outFrame, true);
- for (int i = 0; i < framesLimit - 1; ++i) {
- inFrames.add(ctx.allocateFrame());
- }
- int passCount = 0;
- while (runs.size() > 0) {
- passCount++;
- try {
- doPass(runs, passCount);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- }
+ try {
+ if (runs.size() <= 0) {
+ // If the aggregate results can be fit into
+ // memory...
+ SpillableGroupingHashTable gTable = (SpillableGroupingHashTable) env.get(GROUPTABLES);
+ if (gTable != null) {
+ gTable.flushFrames(writer, sortOutput);
+ }
+ env.set(GROUPTABLES, null);
+ } else {
+ // Otherwise, merge the run files into a single file
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrameAppender.reset(outFrame, true);
+ for (int i = 0; i < framesLimit - 1; ++i) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ int passCount = 0;
+ while (runs.size() > 0) {
+ passCount++;
+ try {
+ doPass(runs, passCount);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
- } finally {
- writer.close();
- }
- env.set(RUNS, null);
- }
+ } finally {
+ writer.close();
+ }
+ env.set(RUNS, null);
+ }
- /**
- * Merge the run files once.
- *
- * @param runs
- * @param passCount
- * @throws HyracksDataException
- * @throws IOException
- */
- private void doPass(LinkedList<RunFileReader> runs,
- int passCount) throws HyracksDataException, IOException {
- FileReference newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
+ /**
+ * Merge the run files once.
+ *
+ * @param runs
+ * @param passCount
+ * @throws HyracksDataException
+ * @throws IOException
+ */
+ private void doPass(LinkedList<RunFileReader> runs, int passCount) throws HyracksDataException,
+ IOException {
+ FileReference newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
- int[] storedKeys = new int[keyFields.length];
- // Get the list of the fields in the stored records.
- for (int i = 0; i < keyFields.length; ++i) {
- storedKeys[i] = i;
- }
+ int[] storedKeys = new int[keyFields.length];
+ // Get the list of the fields in the stored records.
+ for (int i = 0; i < keyFields.length; ++i) {
+ storedKeys[i] = i;
+ }
- // Release the space not used
- if (runs.size() + 1 <= framesLimit) {
- // If there are run files no more than the available
- // frame slots...
- // No run file to be generated, since the result can be
- // directly
- // outputted into the output frame for write.
- finalPass = true;
- for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
- inFrames.remove(i);
- }
- } else {
- // Otherwise, a new run file will be created
- newRun = ctx.getJobletContext().createWorkspaceFile(
- ExternalHashGroupOperatorDescriptor.class
- .getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
- try {
- // Create run file read handler for each input frame
- RunFileReader[] runFileReaders = new RunFileReader[inFrames
- .size()];
- // Create input frame accessor
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
- .size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
- ctx.getFrameSize(), recordDescriptors[0],
- inFrames.size(), comparator);
- // For the index of tuples visited in each frame.
- int[] tupleIndexes = new int[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- tupleIndexes[i] = 0;
- int runIndex = topTuples.peek().getRunid();
- runFileReaders[runIndex] = runs.get(runIndex);
- runFileReaders[runIndex].open();
- // Load the first frame of the file into the main
- // memory
- if (runFileReaders[runIndex].nextFrame(inFrames
- .get(runIndex))) {
- // initialize the tuple accessor for the frame
- tupleAccessors[runIndex] = new FrameTupleAccessor(
- ctx.getFrameSize(),
- recordDescriptors[0]);
- tupleAccessors[runIndex].reset(inFrames
- .get(runIndex));
- setNextTopTuple(runIndex, tupleIndexes,
- runFileReaders, tupleAccessors,
- topTuples);
- } else {
- closeRun(runIndex, runFileReaders,
- tupleAccessors);
- }
- }
- // Merge
- // Get a key holder for the current working
- // aggregator keys
- visitingAggregator = null;
- visitingKeyTuple = null;
- // Loop on all run files, and update the key
- // holder.
- while (!topTuples.areRunsExhausted()) {
- // Get the top record
- ReferenceEntry top = topTuples.peek();
- int tupleIndex = top.getTupleIndex();
- int runIndex = topTuples.peek().getRunid();
- FrameTupleAccessor fta = top.getAccessor();
- if (visitingAggregator == null) {
- // Initialize the aggregator
- visitingAggregator = aggregatorFactory
- .createSpillableAggregator(ctx,
- recordDescriptors[0],
- recordDescriptors[0]);
- // Initialize the partial aggregation result
- visitingAggregator.initFromPartial(fta,
- tupleIndex, keyFields);
- visitingKeyTuple = new ArrayTupleBuilder(
- recordDescriptors[0].getFields().length);
- for (int i = 0; i < keyFields.length; i++) {
- visitingKeyTuple.addField(fta, tupleIndex,
- keyFields[i]);
- }
- } else {
- if (compareTupleWithFrame(visitingKeyTuple,
- fta, tupleIndex, storedKeys, keyFields,
- comparators) == 0) {
- // If the two partial results are on the
- // same key
- visitingAggregator.accumulatePartialResult(
- fta, tupleIndex, keyFields);
- } else {
- // Otherwise, write the partial result back
- // to the output frame
- if (!visitingAggregator.output(
- outFrameAppender, visitingKeyTuple)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!visitingAggregator.output(
- outFrameAppender,
- visitingKeyTuple)) {
- throw new IllegalStateException();
- }
- }
- // Reset the partial aggregation result
- visitingAggregator.initFromPartial(fta,
- tupleIndex, keyFields);
- visitingKeyTuple.reset();
- for (int i = 0; i < keyFields.length; i++) {
- visitingKeyTuple.addField(fta,
- tupleIndex, keyFields[i]);
- }
- }
- }
- tupleIndexes[runIndex]++;
- setNextTopTuple(runIndex, tupleIndexes,
- runFileReaders, tupleAccessors, topTuples);
- }
- // Output the last aggregation result in the frame
- if (visitingAggregator != null) {
- if (!visitingAggregator.output(outFrameAppender,
- visitingKeyTuple)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!visitingAggregator.output(
- outFrameAppender, visitingKeyTuple)) {
- throw new IllegalStateException();
- }
- }
- }
- // Output data into run file writer after all tuples
- // have been checked
- if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- }
- // empty the input frames
- runs.subList(0, inFrames.size()).clear();
- // insert the new run file into the beginning of the run
- // file list
- if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
- }
- } catch (Exception ex) {
- throw new HyracksDataException(ex);
- } finally {
- if (!finalPass) {
- writer.close();
- }
- }
- }
+ // Release the space not used
+ if (runs.size() + 1 <= framesLimit) {
+ // If there are run files no more than the available
+ // frame slots...
+ // No run file to be generated, since the result can be
+ // directly
+ // outputted into the output frame for write.
+ finalPass = true;
+ for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+ inFrames.remove(i);
+ }
+ } else {
+ // Otherwise, a new run file will be created
+ newRun = ctx.getJobletContext().createWorkspaceFile(
+ ExternalHashGroupOperatorDescriptor.class.getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ // Create run file read handler for each input frame
+ RunFileReader[] runFileReaders = new RunFileReader[inFrames.size()];
+ // Create input frame accessor
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
+ recordDescriptors[0], inFrames.size(), comparator);
+ // For the index of tuples visited in each frame.
+ int[] tupleIndexes = new int[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ tupleIndexes[i] = 0;
+ int runIndex = topTuples.peek().getRunid();
+ runFileReaders[runIndex] = runs.get(runIndex);
+ runFileReaders[runIndex].open();
+ // Load the first frame of the file into the main
+ // memory
+ if (runFileReaders[runIndex].nextFrame(inFrames.get(runIndex))) {
+ // initialize the tuple accessor for the frame
+ tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptors[0]);
+ tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+ setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
+ } else {
+ closeRun(runIndex, runFileReaders, tupleAccessors);
+ }
+ }
+ // Merge
+ // Get a key holder for the current working
+ // aggregator keys
+ visitingAggregator = null;
+ visitingKeyTuple = null;
+ // Loop on all run files, and update the key
+ // holder.
+ while (!topTuples.areRunsExhausted()) {
+ // Get the top record
+ ReferenceEntry top = topTuples.peek();
+ int tupleIndex = top.getTupleIndex();
+ int runIndex = topTuples.peek().getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+ if (visitingAggregator == null) {
+ // Initialize the aggregator
+ visitingAggregator = aggregatorFactory.createSpillableAggregator(ctx,
+ recordDescriptors[0], recordDescriptors[0]);
+ // Initialize the partial aggregation result
+ visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
+ visitingKeyTuple = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+ for (int i = 0; i < keyFields.length; i++) {
+ visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
+ }
+ } else {
+ if (compareTupleWithFrame(visitingKeyTuple, fta, tupleIndex, storedKeys, keyFields,
+ comparators) == 0) {
+ // If the two partial results are on the
+ // same key
+ visitingAggregator.accumulatePartialResult(fta, tupleIndex, keyFields);
+ } else {
+ // Otherwise, write the partial result back
+ // to the output frame
+ if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+ throw new IllegalStateException();
+ }
+ }
+ // Reset the partial aggregation result
+ visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
+ visitingKeyTuple.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
+ }
+ }
+ }
+ tupleIndexes[runIndex]++;
+ setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
+ }
+ // Output the last aggregation result in the frame
+ if (visitingAggregator != null) {
+ if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ // Output data into run file writer after all tuples
+ // have been checked
+ if (outFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ }
+ // empty the input frames
+ runs.subList(0, inFrames.size()).clear();
+ // insert the new run file into the beginning of the run
+ // file list
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ }
+ } catch (Exception ex) {
+ throw new HyracksDataException(ex);
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
- /**
- * Insert the tuple into the priority queue.
- *
- * @param runIndex
- * @param tupleIndexes
- * @param runCursors
- * @param tupleAccessors
- * @param topTuples
- * @throws IOException
- */
- private void setNextTopTuple(int runIndex, int[] tupleIndexes,
- RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors,
- ReferencedPriorityQueue topTuples) throws IOException {
- boolean exists = hasNextTuple(runIndex, tupleIndexes,
- runCursors, tupleAccessors);
- if (exists) {
- topTuples.popAndReplace(tupleAccessors[runIndex],
- tupleIndexes[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
+ /**
+ * Insert the tuple into the priority queue.
+ *
+ * @param runIndex
+ * @param tupleIndexes
+ * @param runCursors
+ * @param tupleAccessors
+ * @param topTuples
+ * @throws IOException
+ */
+ private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
+ boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ if (exists) {
+ topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
- /**
- * Check whether there are any more tuples to be checked for the
- * given run file from the corresponding input frame.
- *
- * If the input frame for this run file is exhausted, load a new
- * frame of the run file into the input frame.
- *
- * @param runIndex
- * @param tupleIndexes
- * @param runCursors
- * @param tupleAccessors
- * @return
- * @throws IOException
- */
- private boolean hasNextTuple(int runIndex, int[] tupleIndexes,
- RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors) throws IOException {
+ /**
+ * Check whether there are any more tuples to be checked for the
+ * given run file from the corresponding input frame.
+ * If the input frame for this run file is exhausted, load a new
+ * frame of the run file into the input frame.
+ *
+ * @param runIndex
+ * @param tupleIndexes
+ * @param runCursors
+ * @param tupleAccessors
+ * @return
+ * @throws IOException
+ */
+ private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors) throws IOException {
- if (tupleAccessors[runIndex] == null
- || runCursors[runIndex] == null) {
- /*
- * Return false if the targeting run file is not
- * available, or the frame for the run file is not
- * available.
- */
- return false;
- } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex]
- .getTupleCount()) {
- /*
- * If all tuples in the targeting frame have been
- * checked.
- */
- ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
- // Refill the buffer with contents from the run file.
- if (runCursors[runIndex].nextFrame(buf)) {
- tupleIndexes[runIndex] = 0;
- return hasNextTuple(runIndex, tupleIndexes,
- runCursors, tupleAccessors);
- } else {
- return false;
- }
- } else {
- return true;
- }
- }
+ if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+ /*
+ * Return false if the targeting run file is not
+ * available, or the frame for the run file is not
+ * available.
+ */
+ return false;
+ } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+ /*
+ * If all tuples in the targeting frame have been
+ * checked.
+ */
+ ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
+ // Refill the buffer with contents from the run file.
+ if (runCursors[runIndex].nextFrame(buf)) {
+ tupleIndexes[runIndex] = 0;
+ return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
- /**
- * Close the run file, and also the corresponding readers and
- * input frame.
- *
- * @param index
- * @param runCursors
- * @param tupleAccessor
- * @throws HyracksDataException
- */
- private void closeRun(int index, RunFileReader[] runCursors,
- IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- runCursors[index].close();
- runCursors[index] = null;
- tupleAccessor[index] = null;
- }
+ /**
+ * Close the run file, and also the corresponding readers and
+ * input frame.
+ *
+ * @param index
+ * @param runCursors
+ * @param tupleAccessor
+ * @throws HyracksDataException
+ */
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ runCursors[index].close();
+ runCursors[index] = null;
+ tupleAccessor[index] = null;
+ }
- /**
- * Compare a tuple (in the format of a {@link ArrayTupleBuilder}
- * ) with a record in a frame (in the format of a
- * {@link FrameTupleAccessor}). Comparing keys and comparators
- * are specified for this method as inputs.
- *
- * @param tuple0
- * @param accessor1
- * @param tIndex1
- * @param keys0
- * @param keys1
- * @param comparators
- * @return
- */
- private int compareTupleWithFrame(ArrayTupleBuilder tuple0,
- FrameTupleAccessor accessor1, int tIndex1, int[] keys0,
- int[] keys1, IBinaryComparator[] comparators) {
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength()
- + tStart1;
+ /**
+ * Compare a tuple (in the format of a {@link ArrayTupleBuilder} ) with a record in a frame (in the format of a {@link FrameTupleAccessor}). Comparing keys and comparators
+ * are specified for this method as inputs.
+ *
+ * @param tuple0
+ * @param accessor1
+ * @param tIndex1
+ * @param keys0
+ * @param keys1
+ * @param comparators
+ * @return
+ */
+ private int compareTupleWithFrame(ArrayTupleBuilder tuple0, FrameTupleAccessor accessor1, int tIndex1,
+ int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
- for (int i = 0; i < keys0.length; ++i) {
- int fIdx0 = keys0[i];
- int fStart0 = (i == 0 ? 0
- : tuple0.getFieldEndOffsets()[fIdx0 - 1]);
- int fEnd0 = tuple0.getFieldEndOffsets()[fIdx0];
- int fLen0 = fEnd0 - fStart0;
+ for (int i = 0; i < keys0.length; ++i) {
+ int fIdx0 = keys0[i];
+ int fStart0 = (i == 0 ? 0 : tuple0.getFieldEndOffsets()[fIdx0 - 1]);
+ int fEnd0 = tuple0.getFieldEndOffsets()[fIdx0];
+ int fLen0 = fEnd0 - fStart0;
- int fIdx1 = keys1[i];
- int fStart1 = accessor1.getFieldStartOffset(tIndex1,
- fIdx1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
- int fLen1 = fEnd1 - fStart1;
+ int fIdx1 = keys1[i];
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
+ int fLen1 = fEnd1 - fStart1;
- int c = comparators[i].compare(tuple0.getByteArray(),
- fStart0, fLen0, accessor1.getBuffer().array(),
- fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- return op;
- }
+ int c = comparators[i].compare(tuple0.getByteArray(), fStart0, fLen0, accessor1.getBuffer()
+ .array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ };
+ return op;
+ }
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalHashGroupOperatorDescriptor.this;
- }
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return ExternalHashGroupOperatorDescriptor.this;
+ }
- private Comparator<ReferenceEntry> createEntryComparator(
- final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntry>() {
- public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1
- .getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2
- .getAccessor();
- int j1 = (Integer) tp1.getTupleIndex();
- int j2 = (Integer) tp2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = keyFields[f];
- int s1 = fta1.getTupleStartOffset(j1)
- + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx)
- - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2)
- + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx)
- - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- }
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+ public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+ int j1 = (Integer) tp1.getTupleIndex();
+ int j2 = (Integer) tp2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = keyFields[f];
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ };
+ }
- }
+ }
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
index a2e8281..3fc7d79 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
@@ -21,11 +21,9 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IAccumulatingAggregatorFactory extends Serializable {
- IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx,
- RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor)
- throws HyracksDataException;
+ IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException;
- ISpillableAccumulatingAggregator createSpillableAggregator(
- IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor) throws HyracksDataException;
+ ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
+ RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
index ab7b624..59c69eb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
@@ -22,23 +22,19 @@
/**
* An spillable version of the {@link IAccumulatingAggregator} supporting
* external aggregation.
- *
*/
-public interface ISpillableAccumulatingAggregator extends
- IAccumulatingAggregator {
+public interface ISpillableAccumulatingAggregator extends IAccumulatingAggregator {
- public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
- int[] keyFieldIndexes) throws HyracksDataException;
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+ throws HyracksDataException;
- /**
- *
- * @param accessor
- * @param tIndex
- * @throws HyracksDataException
- */
- public void accumulatePartialResult(IFrameTupleAccessor accessor,
- int tIndex, int[] keyFieldIndexes) throws HyracksDataException;
+ /**
+ * @param accessor
+ * @param tIndex
+ * @throws HyracksDataException
+ */
+ public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
+ throws HyracksDataException;
- public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder)
- throws HyracksDataException;
+ public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
index 16bd972..b3b9f24 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
@@ -34,551 +34,536 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
/**
- * An in-mem hash table for spillable grouping operations.
- *
+ * An in-mem hash table for spillable grouping operations.
* A table of {@link #Link}s are maintained in this object, and each row
- * of this table represents a hash partition.
- *
- *
+ * of this table represents a hash partition.
*/
public class SpillableGroupingHashTable {
- /**
- * Context.
- */
- private final IHyracksStageletContext ctx;
+ /**
+ * Context.
+ */
+ private final IHyracksStageletContext ctx;
- /**
- * Columns for group-by
- */
- private final int[] fields;
+ /**
+ * Columns for group-by
+ */
+ private final int[] fields;
- /**
- * Key fields of records in the hash table (starting from 0
- * to the number of the key fields).
- *
- * This is different from the key fields in the input records,
- * since these fields are extracted when being inserted into
- * the hash table.
- */
- private final int[] storedKeys;
+ /**
+ * Key fields of records in the hash table (starting from 0
+ * to the number of the key fields).
+ * This is different from the key fields in the input records,
+ * since these fields are extracted when being inserted into
+ * the hash table.
+ */
+ private final int[] storedKeys;
- /**
- * Comparators: one for each column in {@link #groupFields}
- */
- private final IBinaryComparator[] comparators;
+ /**
+ * Comparators: one for each column in {@link #groupFields}
+ */
+ private final IBinaryComparator[] comparators;
- /**
- * Record descriptor for the input tuple.
- */
- private final RecordDescriptor inRecordDescriptor;
+ /**
+ * Record descriptor for the input tuple.
+ */
+ private final RecordDescriptor inRecordDescriptor;
- /**
- * Record descriptor for the partial aggregation result.
- */
- private final RecordDescriptor outputRecordDescriptor;
+ /**
+ * Record descriptor for the partial aggregation result.
+ */
+ private final RecordDescriptor outputRecordDescriptor;
- /**
- * Accumulators in the main memory.
- */
- private ISpillableAccumulatingAggregator[] accumulators;
+ /**
+ * Accumulators in the main memory.
+ */
+ private ISpillableAccumulatingAggregator[] accumulators;
- /**
- * The hashing group table containing pointers to aggregators and also the
- * corresponding key tuples. So for each entry, there will be three integer
- * fields:
- *
- * 1. The frame index containing the key tuple; 2. The tuple index inside of
- * the frame for the key tuple; 3. The index of the aggregator.
- *
- * Note that each link in the table is a partition for the input records. Multiple
- * records in the same partition based on the {@link #tpc} are stored as
- * pointers.
- */
- private final Link[] table;
+ /**
+ * The hashing group table containing pointers to aggregators and also the
+ * corresponding key tuples. So for each entry, there will be three integer
+ * fields:
+ * 1. The frame index containing the key tuple; 2. The tuple index inside of
+ * the frame for the key tuple; 3. The index of the aggregator.
+ * Note that each link in the table is a partition for the input records. Multiple
+ * records in the same partition based on the {@link #tpc} are stored as
+ * pointers.
+ */
+ private final Link[] table;
- /**
- * Number of accumulators.
- */
- private int accumulatorSize = 0;
+ /**
+ * Number of accumulators.
+ */
+ private int accumulatorSize = 0;
- /**
- * Factory for the aggregators.
- */
- private final IAccumulatingAggregatorFactory aggregatorFactory;
+ /**
+ * Factory for the aggregators.
+ */
+ private final IAccumulatingAggregatorFactory aggregatorFactory;
- private final List<ByteBuffer> frames;
-
- private final ByteBuffer outFrame;
+ private final List<ByteBuffer> frames;
- /**
- * Frame appender for output frames in {@link #frames}.
- */
- private final FrameTupleAppender appender;
+ private final ByteBuffer outFrame;
- /**
- * The count of used frames in the table.
- *
- * Note that this cannot be replaced by {@link #frames} since frames will
- * not be removed after being created.
- */
- private int dataFrameCount;
+ /**
+ * Frame appender for output frames in {@link #frames}.
+ */
+ private final FrameTupleAppender appender;
- /**
- * Pointers for the sorted aggregators
- */
- private int[] tPointers;
+ /**
+ * The count of used frames in the table.
+ * Note that this cannot be replaced by {@link #frames} since frames will
+ * not be removed after being created.
+ */
+ private int dataFrameCount;
- private static final int INIT_ACCUMULATORS_SIZE = 8;
+ /**
+ * Pointers for the sorted aggregators
+ */
+ private int[] tPointers;
- /**
- * The maximum number of frames available for this hashing group table.
- */
- private final int framesLimit;
+ private static final int INIT_ACCUMULATORS_SIZE = 8;
- private final FrameTuplePairComparator ftpc;
+ /**
+ * The maximum number of frames available for this hashing group table.
+ */
+ private final int framesLimit;
- /**
- * A partition computer to partition the hashing group table.
- */
- private final ITuplePartitionComputer tpc;
+ private final FrameTuplePairComparator ftpc;
- /**
- * Accessors for the tuples. Two accessors are necessary during the sort.
- */
- private final FrameTupleAccessor storedKeysAccessor1;
- private final FrameTupleAccessor storedKeysAccessor2;
+ /**
+ * A partition computer to partition the hashing group table.
+ */
+ private final ITuplePartitionComputer tpc;
- /**
- * Create a spillable grouping hash table.
- * @param ctx The context of the job.
- * @param fields Fields of keys for grouping.
- * @param comparatorFactories The comparators.
- * @param tpcf The partitioners. These are used to partition the incoming records into proper partition of the hash table.
- * @param aggregatorFactory The aggregators.
- * @param inRecordDescriptor Record descriptor for input data.
- * @param outputRecordDescriptor Record descriptor for output data.
- * @param framesLimit The maximum number of frames usable in the memory for hash table.
- * @param tableSize The size of the table, which specified the number of partitions of the table.
- */
- public SpillableGroupingHashTable(IHyracksStageletContext ctx, int[] fields,
- IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf,
- IAccumulatingAggregatorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outputRecordDescriptor, int framesLimit,
- int tableSize) {
- this.ctx = ctx;
- this.fields = fields;
+ /**
+ * Accessors for the tuples. Two accessors are necessary during the sort.
+ */
+ private final FrameTupleAccessor storedKeysAccessor1;
+ private final FrameTupleAccessor storedKeysAccessor2;
- storedKeys = new int[fields.length];
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-
- // Note that after storing a record into the hash table, the index for the fields should
- // be updated. Here we assume that all these key fields are written at the beginning of
- // the record, so their index should start from 0 and end at the length of the key fields.
- for (int i = 0; i < fields.length; ++i) {
- storedKeys[i] = i;
- storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
- }
- RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
- storedKeySerDeser);
- storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
- storedKeysRecordDescriptor);
- storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
- storedKeysRecordDescriptor);
+ /**
+ * Create a spillable grouping hash table.
+ *
+ * @param ctx
+ * The context of the job.
+ * @param fields
+ * Fields of keys for grouping.
+ * @param comparatorFactories
+ * The comparators.
+ * @param tpcf
+ * The partitioners. These are used to partition the incoming records into proper partition of the hash table.
+ * @param aggregatorFactory
+ * The aggregators.
+ * @param inRecordDescriptor
+ * Record descriptor for input data.
+ * @param outputRecordDescriptor
+ * Record descriptor for output data.
+ * @param framesLimit
+ * The maximum number of frames usable in the memory for hash table.
+ * @param tableSize
+ * The size of the table, which specified the number of partitions of the table.
+ */
+ public SpillableGroupingHashTable(IHyracksStageletContext ctx, int[] fields,
+ IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory tpcf,
+ IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outputRecordDescriptor, int framesLimit, int tableSize) {
+ this.ctx = ctx;
+ this.fields = fields;
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
+ storedKeys = new int[fields.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
- this.table = new Link[tableSize];
+ // Note that after storing a record into the hash table, the index for the fields should
+ // be updated. Here we assume that all these key fields are written at the beginning of
+ // the record, so their index should start from 0 and end at the length of the key fields.
+ for (int i = 0; i < fields.length; ++i) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+ }
+ RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
+ storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+ storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
- this.aggregatorFactory = aggregatorFactory;
- accumulators = new ISpillableAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
- this.framesLimit = framesLimit;
+ this.table = new Link[tableSize];
- // Tuple pair comparator
- ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-
- // Partitioner
- tpc = tpcf.createPartitioner();
+ this.aggregatorFactory = aggregatorFactory;
+ accumulators = new ISpillableAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
- this.inRecordDescriptor = inRecordDescriptor;
- this.outputRecordDescriptor = outputRecordDescriptor;
- frames = new ArrayList<ByteBuffer>();
- appender = new FrameTupleAppender(ctx.getFrameSize());
+ this.framesLimit = framesLimit;
- dataFrameCount = -1;
-
- outFrame = ctx.allocateFrame();
- }
+ // Tuple pair comparator
+ ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
- public void reset() {
- dataFrameCount = -1;
- tPointers = null;
- // Reset the grouping hash table
- for (int i = 0; i < table.length; i++) {
- table[i] = new Link();
- }
- }
+ // Partitioner
+ tpc = tpcf.createPartitioner();
- public int getFrameCount() {
- return dataFrameCount;
- }
+ this.inRecordDescriptor = inRecordDescriptor;
+ this.outputRecordDescriptor = outputRecordDescriptor;
+ frames = new ArrayList<ByteBuffer>();
+ appender = new FrameTupleAppender(ctx.getFrameSize());
- /**
- * How to define pointers for the partial aggregation
- *
- * @return
- */
- public int[] getTPointers() {
- return tPointers;
- }
+ dataFrameCount = -1;
- /**
- * Redefine the number of fields in the pointer.
- *
- * Only two pointers are necessary for external grouping: one is to the
- * index of the hash table, and the other is to the row index inside of the
- * hash table.
- *
- * @return
- */
- public int getPtrFields() {
- return 2;
- }
+ outFrame = ctx.allocateFrame();
+ }
- public List<ByteBuffer> getFrames() {
- return frames;
- }
+ public void reset() {
+ dataFrameCount = -1;
+ tPointers = null;
+ // Reset the grouping hash table
+ for (int i = 0; i < table.length; i++) {
+ table[i] = new Link();
+ }
+ }
- /**
- * Set the working frame to the next available frame in the
- * frame list. There are two cases:<br>
- *
- * 1) If the next frame is not initialized, allocate
- * a new frame.
- *
- * 2) When frames are already created, they are recycled.
- *
- * @return Whether a new frame is added successfully.
- */
- private boolean nextAvailableFrame() {
- // Return false if the number of frames is equal to the limit.
- if (dataFrameCount + 1 >= framesLimit)
- return false;
-
- if (frames.size() < framesLimit) {
- // Insert a new frame
- ByteBuffer frame = ctx.allocateFrame();
- frame.position(0);
- frame.limit(frame.capacity());
- frames.add(frame);
- appender.reset(frame, true);
- dataFrameCount++;
- } else {
- // Reuse an old frame
- dataFrameCount++;
- ByteBuffer frame = frames.get(dataFrameCount);
- frame.position(0);
- frame.limit(frame.capacity());
- appender.reset(frame, true);
- }
- return true;
- }
+ public int getFrameCount() {
+ return dataFrameCount;
+ }
- /**
- * Insert a new record from the input frame.
- *
- * @param accessor
- * @param tIndex
- * @return
- * @throws HyracksDataException
- */
- public boolean insert(FrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- if (dataFrameCount < 0)
- nextAvailableFrame();
- // Get the partition for the inserting tuple
- int entry = tpc.partition(accessor, tIndex, table.length);
- Link link = table[entry];
- if (link == null) {
- link = table[entry] = new Link();
- }
- // Find the corresponding aggregator from existing aggregators
- ISpillableAccumulatingAggregator aggregator = null;
- for (int i = 0; i < link.size; i += 3) {
- int sbIndex = link.pointers[i];
- int stIndex = link.pointers[i + 1];
- int saIndex = link.pointers[i + 2];
- storedKeysAccessor1.reset(frames.get(sbIndex));
- int c = ftpc
- .compare(accessor, tIndex, storedKeysAccessor1, stIndex);
- if (c == 0) {
- aggregator = accumulators[saIndex];
- break;
- }
- }
- // Do insert
- if (aggregator == null) {
- // Did not find the aggregator. Insert a new aggregator entry
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- if (!nextAvailableFrame()) {
- // If buffer is full, return false to trigger a run file
- // write
- return false;
- } else {
- // Try to do insert after adding a new frame.
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- throw new IllegalStateException();
- }
- }
- }
- int sbIndex = dataFrameCount;
- int stIndex = appender.getTupleCount() - 1;
- if (accumulatorSize >= accumulators.length) {
- accumulators = Arrays.copyOf(accumulators,
- accumulators.length * 2);
- }
- int saIndex = accumulatorSize++;
- aggregator = accumulators[saIndex] = aggregatorFactory
- .createSpillableAggregator(ctx, inRecordDescriptor,
- outputRecordDescriptor);
- aggregator.init(accessor, tIndex);
- link.add(sbIndex, stIndex, saIndex);
- }
- aggregator.accumulate(accessor, tIndex);
- return true;
- }
+ /**
+ * How to define pointers for the partial aggregation
+ *
+ * @return
+ */
+ public int[] getTPointers() {
+ return tPointers;
+ }
- /**
- * Sort partial results
- */
- public void sortFrames() {
- int totalTCount = 0;
- // Get the number of records
- for (int i = 0; i < table.length; i++) {
- if (table[i] == null)
- continue;
- totalTCount += table[i].size / 3;
- }
- // Start sorting:
- /*
- * Based on the data structure for the partial aggregates, the
- * pointers should be initialized.
- */
- tPointers = new int[totalTCount * getPtrFields()];
- // Initialize pointers
- int ptr = 0;
- // Maintain two pointers to each entry of the hashing group table
- for (int i = 0; i < table.length; i++) {
- if (table[i] == null)
- continue;
- for (int j = 0; j < table[i].size; j = j + 3) {
- tPointers[ptr * getPtrFields()] = i;
- tPointers[ptr * getPtrFields() + 1] = j;
- ptr++;
- }
- }
- // Sort using quick sort
- if (tPointers.length > 0) {
- sort(tPointers, 0, totalTCount);
- }
- }
+ /**
+ * Redefine the number of fields in the pointer.
+ * Only two pointers are necessary for external grouping: one is to the
+ * index of the hash table, and the other is to the row index inside of the
+ * hash table.
+ *
+ * @return
+ */
+ public int getPtrFields() {
+ return 2;
+ }
- /**
- *
- * @param writer
- * @throws HyracksDataException
- */
- public void flushFrames(IFrameWriter writer, boolean sorted) throws HyracksDataException {
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ public List<ByteBuffer> getFrames() {
+ return frames;
+ }
- ISpillableAccumulatingAggregator aggregator = null;
- writer.open();
- appender.reset(outFrame, true);
- if (sorted){
- sortFrames();
- }
- if(tPointers == null){
- // Not sorted
- for (int i = 0; i < table.length; ++i) {
- Link link = table[i];
- if (link != null) {
- for (int j = 0; j < link.size; j += 3) {
- int bIndex = link.pointers[j];
- int tIndex = link.pointers[j + 1];
- int aIndex = link.pointers[j + 2];
- ByteBuffer keyBuffer = frames.get(bIndex);
- storedKeysAccessor1.reset(keyBuffer);
- aggregator = accumulators[aIndex];
- while (!aggregator.output(appender, storedKeysAccessor1, tIndex, storedKeys)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- }
- }
- }
- }
- if (appender.getTupleCount() != 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- return;
- }
- int n = tPointers.length / getPtrFields();
- for (int ptr = 0; ptr < n; ptr++) {
- int tableIndex = tPointers[ptr * 2];
- int rowIndex = tPointers[ptr * 2 + 1];
- int frameIndex = table[tableIndex].pointers[rowIndex];
- int tupleIndex = table[tableIndex].pointers[rowIndex + 1];
- int aggregatorIndex = table[tableIndex].pointers[rowIndex + 2];
- // Get the frame containing the value
- ByteBuffer buffer = frames.get(frameIndex);
- storedKeysAccessor1.reset(buffer);
+ /**
+ * Set the working frame to the next available frame in the
+ * frame list. There are two cases:<br>
+ * 1) If the next frame is not initialized, allocate
+ * a new frame.
+ * 2) When frames are already created, they are recycled.
+ *
+ * @return Whether a new frame is added successfully.
+ */
+ private boolean nextAvailableFrame() {
+ // Return false if the number of frames is equal to the limit.
+ if (dataFrameCount + 1 >= framesLimit)
+ return false;
- // Get the aggregator
- aggregator = accumulators[aggregatorIndex];
- // Insert
- if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex,
- fields)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!aggregator.output(appender, storedKeysAccessor1,
- tupleIndex, fields)) {
- throw new IllegalStateException();
- } else {
- accumulators[aggregatorIndex] = null;
- }
- } else {
- accumulators[aggregatorIndex] = null;
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- }
+ if (frames.size() < framesLimit) {
+ // Insert a new frame
+ ByteBuffer frame = ctx.allocateFrame();
+ frame.position(0);
+ frame.limit(frame.capacity());
+ frames.add(frame);
+ appender.reset(frame, true);
+ dataFrameCount++;
+ } else {
+ // Reuse an old frame
+ dataFrameCount++;
+ ByteBuffer frame = frames.get(dataFrameCount);
+ frame.position(0);
+ frame.limit(frame.capacity());
+ appender.reset(frame, true);
+ }
+ return true;
+ }
- private void sort(int[] tPointers, int offset, int length) {
- int m = offset + (length >> 1);
- // Get table index
- int mTable = tPointers[m * 2];
- int mRow = tPointers[m * 2 + 1];
- // Get frame and tuple index
- int mFrame = table[mTable].pointers[mRow];
- int mTuple = table[mTable].pointers[mRow + 1];
- storedKeysAccessor1.reset(frames.get(mFrame));
+ /**
+ * Insert a new record from the input frame.
+ *
+ * @param accessor
+ * @param tIndex
+ * @return
+ * @throws HyracksDataException
+ */
+ public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ if (dataFrameCount < 0)
+ nextAvailableFrame();
+ // Get the partition for the inserting tuple
+ int entry = tpc.partition(accessor, tIndex, table.length);
+ Link link = table[entry];
+ if (link == null) {
+ link = table[entry] = new Link();
+ }
+ // Find the corresponding aggregator from existing aggregators
+ ISpillableAccumulatingAggregator aggregator = null;
+ for (int i = 0; i < link.size; i += 3) {
+ int sbIndex = link.pointers[i];
+ int stIndex = link.pointers[i + 1];
+ int saIndex = link.pointers[i + 2];
+ storedKeysAccessor1.reset(frames.get(sbIndex));
+ int c = ftpc.compare(accessor, tIndex, storedKeysAccessor1, stIndex);
+ if (c == 0) {
+ aggregator = accumulators[saIndex];
+ break;
+ }
+ }
+ // Do insert
+ if (aggregator == null) {
+ // Did not find the aggregator. Insert a new aggregator entry
+ if (!appender.appendProjection(accessor, tIndex, fields)) {
+ if (!nextAvailableFrame()) {
+ // If buffer is full, return false to trigger a run file
+ // write
+ return false;
+ } else {
+ // Try to do insert after adding a new frame.
+ if (!appender.appendProjection(accessor, tIndex, fields)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ int sbIndex = dataFrameCount;
+ int stIndex = appender.getTupleCount() - 1;
+ if (accumulatorSize >= accumulators.length) {
+ accumulators = Arrays.copyOf(accumulators, accumulators.length * 2);
+ }
+ int saIndex = accumulatorSize++;
+ aggregator = accumulators[saIndex] = aggregatorFactory.createSpillableAggregator(ctx, inRecordDescriptor,
+ outputRecordDescriptor);
+ aggregator.init(accessor, tIndex);
+ link.add(sbIndex, stIndex, saIndex);
+ }
+ aggregator.accumulate(accessor, tIndex);
+ return true;
+ }
- int a = offset;
- int b = a;
- int c = offset + length - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int bTable = tPointers[b * 2];
- int bRow = tPointers[b * 2 + 1];
- int bFrame = table[bTable].pointers[bRow];
- int bTuple = table[bTable].pointers[bRow + 1];
- storedKeysAccessor2.reset(frames.get(bFrame));
- int cmp = ftpc.compare(storedKeysAccessor2, bTuple,
- storedKeysAccessor1, mTuple);
- // int cmp = compare(tPointers, b, mi, mj, mv);
- if (cmp > 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, a++, b);
- }
- ++b;
- }
- while (c >= b) {
- int cTable = tPointers[c * 2];
- int cRow = tPointers[c * 2 + 1];
- int cFrame = table[cTable].pointers[cRow];
- int cTuple = table[cTable].pointers[cRow + 1];
- storedKeysAccessor2.reset(frames.get(cFrame));
- int cmp = ftpc.compare(storedKeysAccessor2, cTuple,
- storedKeysAccessor1, mTuple);
- // int cmp = compare(tPointers, c, mi, mj, mv);
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, c, d--);
- }
- --c;
- }
- if (b > c)
- break;
- swap(tPointers, b++, c--);
- }
+ /**
+ * Sort partial results
+ */
+ public void sortFrames() {
+ int totalTCount = 0;
+ // Get the number of records
+ for (int i = 0; i < table.length; i++) {
+ if (table[i] == null)
+ continue;
+ totalTCount += table[i].size / 3;
+ }
+ // Start sorting:
+ /*
+ * Based on the data structure for the partial aggregates, the
+ * pointers should be initialized.
+ */
+ tPointers = new int[totalTCount * getPtrFields()];
+ // Initialize pointers
+ int ptr = 0;
+ // Maintain two pointers to each entry of the hashing group table
+ for (int i = 0; i < table.length; i++) {
+ if (table[i] == null)
+ continue;
+ for (int j = 0; j < table[i].size; j = j + 3) {
+ tPointers[ptr * getPtrFields()] = i;
+ tPointers[ptr * getPtrFields() + 1] = j;
+ ptr++;
+ }
+ }
+ // Sort using quick sort
+ if (tPointers.length > 0) {
+ sort(tPointers, 0, totalTCount);
+ }
+ }
- int s;
- int n = offset + length;
- s = Math.min(a - offset, b - a);
- vecswap(tPointers, offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(tPointers, b, n - s, s);
+ /**
+ * @param writer
+ * @throws HyracksDataException
+ */
+ public void flushFrames(IFrameWriter writer, boolean sorted) throws HyracksDataException {
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- if ((s = b - a) > 1) {
- sort(tPointers, offset, s);
- }
- if ((s = d - c) > 1) {
- sort(tPointers, n - s, s);
- }
- }
+ ISpillableAccumulatingAggregator aggregator = null;
+ writer.open();
+ appender.reset(outFrame, true);
+ if (sorted) {
+ sortFrames();
+ }
+ if (tPointers == null) {
+ // Not sorted
+ for (int i = 0; i < table.length; ++i) {
+ Link link = table[i];
+ if (link != null) {
+ for (int j = 0; j < link.size; j += 3) {
+ int bIndex = link.pointers[j];
+ int tIndex = link.pointers[j + 1];
+ int aIndex = link.pointers[j + 2];
+ ByteBuffer keyBuffer = frames.get(bIndex);
+ storedKeysAccessor1.reset(keyBuffer);
+ aggregator = accumulators[aIndex];
+ while (!aggregator.output(appender, storedKeysAccessor1, tIndex, storedKeys)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ }
+ }
+ }
+ }
+ if (appender.getTupleCount() != 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ return;
+ }
+ int n = tPointers.length / getPtrFields();
+ for (int ptr = 0; ptr < n; ptr++) {
+ int tableIndex = tPointers[ptr * 2];
+ int rowIndex = tPointers[ptr * 2 + 1];
+ int frameIndex = table[tableIndex].pointers[rowIndex];
+ int tupleIndex = table[tableIndex].pointers[rowIndex + 1];
+ int aggregatorIndex = table[tableIndex].pointers[rowIndex + 2];
+ // Get the frame containing the value
+ ByteBuffer buffer = frames.get(frameIndex);
+ storedKeysAccessor1.reset(buffer);
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 2; ++i) {
- int t = x[a * 2 + i];
- x[a * 2 + i] = x[b * 2 + i];
- x[b * 2 + i] = t;
- }
- }
+ // Get the aggregator
+ aggregator = accumulators[aggregatorIndex];
+ // Insert
+ if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
+ throw new IllegalStateException();
+ } else {
+ accumulators[aggregatorIndex] = null;
+ }
+ } else {
+ accumulators[aggregatorIndex] = null;
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
- private void vecswap(int x[], int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
- }
- }
+ private void sort(int[] tPointers, int offset, int length) {
+ int m = offset + (length >> 1);
+ // Get table index
+ int mTable = tPointers[m * 2];
+ int mRow = tPointers[m * 2 + 1];
+ // Get frame and tuple index
+ int mFrame = table[mTable].pointers[mRow];
+ int mTuple = table[mTable].pointers[mRow + 1];
+ storedKeysAccessor1.reset(frames.get(mFrame));
- /**
- * The pointers in the link store 3 int values for each entry in the
- * hashtable: (bufferIdx, tIndex, accumulatorIdx).
- *
- * @author vinayakb
- */
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 9;
+ int a = offset;
+ int b = a;
+ int c = offset + length - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int bTable = tPointers[b * 2];
+ int bRow = tPointers[b * 2 + 1];
+ int bFrame = table[bTable].pointers[bRow];
+ int bTuple = table[bTable].pointers[bRow + 1];
+ storedKeysAccessor2.reset(frames.get(bFrame));
+ int cmp = ftpc.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
+ // int cmp = compare(tPointers, b, mi, mj, mv);
+ if (cmp > 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, a++, b);
+ }
+ ++b;
+ }
+ while (c >= b) {
+ int cTable = tPointers[c * 2];
+ int cRow = tPointers[c * 2 + 1];
+ int cFrame = table[cTable].pointers[cRow];
+ int cTuple = table[cTable].pointers[cRow + 1];
+ storedKeysAccessor2.reset(frames.get(cFrame));
+ int cmp = ftpc.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
+ // int cmp = compare(tPointers, c, mi, mj, mv);
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, c, d--);
+ }
+ --c;
+ }
+ if (b > c)
+ break;
+ swap(tPointers, b++, c--);
+ }
- int[] pointers;
- int size;
+ int s;
+ int n = offset + length;
+ s = Math.min(a - offset, b - a);
+ vecswap(tPointers, offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(tPointers, b, n - s, s);
- Link() {
- pointers = new int[INIT_POINTERS_SIZE];
- size = 0;
- }
+ if ((s = b - a) > 1) {
+ sort(tPointers, offset, s);
+ }
+ if ((s = d - c) > 1) {
+ sort(tPointers, n - s, s);
+ }
+ }
- void add(int bufferIdx, int tIndex, int accumulatorIdx) {
- while (size + 3 > pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
- }
- pointers[size++] = bufferIdx;
- pointers[size++] = tIndex;
- pointers[size++] = accumulatorIdx;
- }
+ private void swap(int x[], int a, int b) {
+ for (int i = 0; i < 2; ++i) {
+ int t = x[a * 2 + i];
+ x[a * 2 + i] = x[b * 2 + i];
+ x[b * 2 + i] = t;
+ }
+ }
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("[Size=" + size + "]");
- for (int i = 0; i < pointers.length; i = i + 3) {
- sb.append(pointers[i] + ",");
- sb.append(pointers[i + 1] + ",");
- sb.append(pointers[i + 2] + "; ");
- }
- return sb.toString();
- }
- }
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(x, a, b);
+ }
+ }
+
+ /**
+ * The pointers in the link store 3 int values for each entry in the
+ * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+ *
+ * @author vinayakb
+ */
+ private static class Link {
+ private static final int INIT_POINTERS_SIZE = 9;
+
+ int[] pointers;
+ int size;
+
+ Link() {
+ pointers = new int[INIT_POINTERS_SIZE];
+ size = 0;
+ }
+
+ void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+ while (size + 3 > pointers.length) {
+ pointers = Arrays.copyOf(pointers, pointers.length * 2);
+ }
+ pointers[size++] = bufferIdx;
+ pointers[size++] = tIndex;
+ pointers[size++] = accumulatorIdx;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[Size=" + size + "]");
+ for (int i = 0; i < pointers.length; i = i + 3) {
+ sb.append(pointers[i] + ",");
+ sb.append(pointers[i + 1] + ",");
+ sb.append(pointers[i + 2] + "; ");
+ }
+ return sb.toString();
+ }
+ }
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
index 421b47c..2621049 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
@@ -60,517 +60,371 @@
/**
* Test cases for external hash group operator.
- *
*/
public class ExternalAggregateTest extends AbstractIntegrationTest {
- /**
- * Test 01: aggregate (count) on single field, on a simple data set.
- *
- * @throws Exception
- */
- @Test
- public void externalAggregateTestSingleFieldSimpleData() throws Exception {
- JobSpecification spec = new JobSpecification();
+ /**
+ * Test 01: aggregate (count) on single field, on a simple data set.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void externalAggregateTestSingleFieldSimpleData() throws Exception {
+ JobSpecification spec = new JobSpecification();
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
- new FileSplit[] {
- new FileSplit(NC2_ID, new FileReference(new File(
- "data/wordcount.tsv"))),
- new FileSplit(NC1_ID, new FileReference(new File(
- "data/wordcount.tsv"))) });
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
+ new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
- // Input format: a string field as the key
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+ // Input format: a string field as the key
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- // Output format: a string field as the key, and an integer field as the
- // count
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ // Output format: a string field as the key, and an integer field as the
+ // count
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- // Data set format: word(string),count(int)
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec,
- splitProvider,
- new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
- ','), desc);
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID) });
- csvScanner.setPartitionConstraint(csvPartitionConstraint);
+ // Data set format: word(string),count(int)
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ csvScanner.setPartitionConstraint(csvPartitionConstraint);
- int[] keys = new int[] { 0 };
- int tableSize = 8;
+ int[] keys = new int[] { 0 };
+ int tableSize = 8;
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
- spec, // Job conf
- keys, // Group key
- 3, // Number of frames
- false, // Whether to sort the output
- // Hash partitioner
- new FieldHashPartitionComputerFactory(
- keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- // Key comparator
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- // Aggregator factory
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outputRec, // Output format
- tableSize // Size of the hashing table, which is used to control
- // the partition when hashing
- );
+ ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
+ spec, // Job conf
+ keys, // Group key
+ 3, // Number of frames
+ false, // Whether to sort the output
+ // Hash partitioner
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ // Key comparator
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ // Aggregator factory
+ new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ outputRec, // Output format
+ tableSize // Size of the hashing table, which is used to control
+ // the partition when hashing
+ );
- PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID) });
- grouper.setPartitionConstraint(grouperPartitionConstraint);
+ PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ grouper.setPartitionConstraint(grouperPartitionConstraint);
- IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID) });
- printer.setPartitionConstraint(printerPartitionConstraint);
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- /**
- * Test 02: Control experiment using in-memory aggregator, on the same data
- * set of {@link #externalAggregateTest01()}
- *
- * @throws Exception
- */
- @Test
- public void externalAggregateTestSingleFieldSimpleDataInMemControl()
- throws Exception {
- JobSpecification spec = new JobSpecification();
+ /**
+ * Test 02: Control experiment using in-memory aggregator, on the same data
+ * set of {@link #externalAggregateTest01()}
+ *
+ * @throws Exception
+ */
+ @Test
+ public void externalAggregateTestSingleFieldSimpleDataInMemControl() throws Exception {
+ JobSpecification spec = new JobSpecification();
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
- new FileSplit[] {
- new FileSplit(NC2_ID, new FileReference(new File(
- "data/wordcount.tsv"))),
- new FileSplit(NC1_ID, new FileReference(new File(
- "data/wordcount.tsv"))) });
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
+ new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec,
- splitProvider,
- new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
- ','), desc);
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID) });
- csvScanner.setPartitionConstraint(csvPartitionConstraint);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ csvScanner.setPartitionConstraint(csvPartitionConstraint);
- int[] keys = new int[] { 0 };
- int tableSize = 8;
+ int[] keys = new int[] { 0 };
+ int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keys,
- new FieldHashPartitionComputerFactory(
- keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keys,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ outputRec, tableSize);
- PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID) });
- grouper.setPartitionConstraint(grouperPartitionConstraint);
+ PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ grouper.setPartitionConstraint(grouperPartitionConstraint);
- IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID) });
- printer.setPartitionConstraint(printerPartitionConstraint);
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- /**
- * Test 03: aggregates on multiple fields
- *
- * @throws Exception
- */
- @Test
- public void externalAggregateTestMultiAggFields() throws Exception {
- JobSpecification spec = new JobSpecification();
+ /**
+ * Test 03: aggregates on multiple fields
+ *
+ * @throws Exception
+ */
+ @Test
+ public void externalAggregateTestMultiAggFields() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID,
- new FileReference(new File("data/tpch0.001/lineitem.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
- ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/lineitem.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
- spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- UTF8StringParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
- PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- ordScanner.setPartitionConstraint(csvPartitionConstraint);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(csvPartitionConstraint);
- int[] keys = new int[] { 0 };
- int tableSize = 8;
+ int[] keys = new int[] { 0 };
+ int tableSize = 8;
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
- spec,
- keys,
- 3,
- false,
- new FieldHashPartitionComputerFactory(
- keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] {
- new CountAggregatorFactory(),
- new SumAggregatorFactory(4),
- new MinMaxAggregatorFactory(true, 5) }),
- outputRec, tableSize);
+ ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
+ new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec, tableSize);
- PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- grouper.setPartitionConstraint(grouperPartitionConstraint);
+ PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ grouper.setPartitionConstraint(grouperPartitionConstraint);
- IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keys,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, ordScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, ordScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- printer.setPartitionConstraint(printerPartitionConstraint);
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- /**
- * Test 05: aggregate on multiple key fields
- *
- * @throws Exception
- */
- @Test
- public void externalAggregateTestMultiKeys() throws Exception {
- JobSpecification spec = new JobSpecification();
+ /**
+ * Test 05: aggregate on multiple key fields
+ *
+ * @throws Exception
+ */
+ @Test
+ public void externalAggregateTestMultiKeys() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID,
- new FileReference(new File("data/tpch0.001/lineitem.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
- ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/lineitem.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
- spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
- PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- ordScanner.setPartitionConstraint(csvPartitionConstraint);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(csvPartitionConstraint);
- // Group on two fields
- int[] keys = new int[] { 0, 1 };
- int tableSize = 8;
+ // Group on two fields
+ int[] keys = new int[] { 0, 1 };
+ int tableSize = 8;
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
- spec,
- keys,
- 3,
- false,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] {
- new CountAggregatorFactory(),
- new SumAggregatorFactory(4),
- new MinMaxAggregatorFactory(true, 5) }),
- outputRec, tableSize);
+ ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
+ new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
+ tableSize);
- PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- grouper.setPartitionConstraint(grouperPartitionConstraint);
+ PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ grouper.setPartitionConstraint(grouperPartitionConstraint);
- IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, ordScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, ordScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- printer.setPartitionConstraint(printerPartitionConstraint);
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- /**
- * Test 06: tests on non-string key field
- *
- * @throws Exception
- */
- @Test
- public void externalAggregateTestNonStringKey() throws Exception {
- JobSpecification spec = new JobSpecification();
+ /**
+ * Test 06: tests on non-string key field
+ *
+ * @throws Exception
+ */
+ @Test
+ public void externalAggregateTestNonStringKey() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID,
- new FileReference(new File("data/tpch0.001/lineitem.tbl"))) };
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
- ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/lineitem.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
- spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
- PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- ordScanner.setPartitionConstraint(csvPartitionConstraint);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(csvPartitionConstraint);
- // Group on two fields
- int[] keys = new int[] { 0, 1 };
- int tableSize = 8;
+ // Group on two fields
+ int[] keys = new int[] { 0, 1 };
+ int tableSize = 8;
- ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
- spec, keys, 3000, true, new FieldHashPartitionComputerFactory(
- keys, new IBinaryHashFunctionFactory[] {
- IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] {
- new CountAggregatorFactory(),
- new SumAggregatorFactory(4),
- new MinMaxAggregatorFactory(true, 5) }),
- outputRec, tableSize);
+ ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3000, true,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
+ new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
+ tableSize);
- PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- grouper.setPartitionConstraint(grouperPartitionConstraint);
+ PartitionConstraint grouperPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ grouper.setPartitionConstraint(grouperPartitionConstraint);
- IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, ordScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, ordScanner, 0, grouper, 0);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(
- NC1_ID) });
- printer.setPartitionConstraint(printerPartitionConstraint);
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
}
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 9b88062..f02c855 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -65,308 +65,243 @@
/**
* The application client for the performance tests of the external hash group
* operator.
- *
*/
public class ExternalGroupClient {
- private static class Options {
- @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
- public String host;
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
- @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
- public int port = 1099;
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+ public int port = 1099;
- @Option(name = "-app", usage = "Hyracks Application name", required = true)
- public String app;
+ @Option(name = "-app", usage = "Hyracks Application name", required = true)
+ public String app;
- @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
- public String inFileSplits;
+ @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+ public String inFileSplits;
- @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
- public String outFileSplits;
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
- @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
- public int htSize = 8191;
+ @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
+ public int htSize = 8191;
- @Option(name = "-frames-limit", usage = "Frame size (default: 32768)", required = false)
- public int framesLimit = 32768;
+ @Option(name = "-frames-limit", usage = "Frame size (default: 32768)", required = false)
+ public int framesLimit = 32768;
- @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
- public int sbSize = 512;
+ @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
+ public int sbSize = 512;
- @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
- public boolean sortOutput = false;
+ @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
+ public boolean sortOutput = false;
- @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
- public boolean outPlain = true;
- }
+ @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
+ public boolean outPlain = true;
+ }
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
- options.port);
+ IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
- JobSpecification job;
+ JobSpecification job;
- for (int i = 0; i < 3; i++) {
- long start = System.currentTimeMillis();
- job = createJob(parseFileSplits(options.inFileSplits),
- parseFileSplits(options.outFileSplits, i % 2),
- options.htSize, options.sbSize, options.framesLimit,
- options.sortOutput, i % 2, options.outPlain);
+ for (int i = 0; i < 3; i++) {
+ long start = System.currentTimeMillis();
+ job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i % 2),
+ options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 2, options.outPlain);
- System.out.print(i + "\t" + (System.currentTimeMillis() - start));
- start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(options.app, job);
- hcc.start(jobId);
- hcc.waitForCompletion(jobId);
- System.out.println("\t" + (System.currentTimeMillis() - start));
- }
- }
+ System.out.print(i + "\t" + (System.currentTimeMillis() - start));
+ start = System.currentTimeMillis();
+ UUID jobId = hcc.createJob(options.app, job);
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ System.out.println("\t" + (System.currentTimeMillis() - start));
+ }
+ }
- private static FileSplit[] parseFileSplits(String fileSplits) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s
- + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
- new File(s.substring(idx + 1))));
- }
- return fSplits;
- }
+ private static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ }
+ return fSplits;
+ }
- private static FileSplit[] parseFileSplits(String fileSplits, int count) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s
- + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
- new File(s.substring(idx + 1) + "_" + count)));
- }
- return fSplits;
- }
+ private static FileSplit[] parseFileSplits(String fileSplits, int count) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
+ + count)));
+ }
+ return fSplits;
+ }
- private static JobSpecification createJob(FileSplit[] inSplits,
- FileSplit[] outSplits, int htSize, int sbSize, int framesLimit,
- boolean sortOutput, int alg, boolean outPlain) {
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(
- inSplits);
+ private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
+ int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
- RecordDescriptor inDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(
- spec, splitsProvider, new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] {
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'),
- inDesc);
+ FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
- fileScanner.setPartitionConstraint(createPartitionConstraint(inSplits));
+ fileScanner.setPartitionConstraint(createPartitionConstraint(inSplits));
- // Output: each unique string with an integer count
- RecordDescriptor outDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- // IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ // Output: each unique string with an integer count
+ RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE,
+ // IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- // Specify the grouping key, which will be the string extracted during
- // the scan.
- int[] keys = new int[] { 0,
- // 1
- };
+ // Specify the grouping key, which will be the string extracted during
+ // the scan.
+ int[] keys = new int[] { 0,
+ // 1
+ };
- AbstractOperatorDescriptor grouper;
+ AbstractOperatorDescriptor grouper;
- switch (alg) {
- case 0: // External hash group
- grouper = new ExternalHashGroupOperatorDescriptor(
- spec,
- keys,
- framesLimit,
- false,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outDesc, htSize);
+ switch (alg) {
+ case 0: // External hash group
+ grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+ htSize);
- grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+ grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
- break;
- case 1: // External sort + pre-cluster
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, framesLimit, keys, new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
- sorter.setPartitionConstraint(createPartitionConstraint(inSplits));
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
+ break;
+ case 1: // External sort + pre-cluster
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
+ sorter.setPartitionConstraint(createPartitionConstraint(inSplits));
- // Connect scan operator with the sorter
- IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
+ // Connect scan operator with the sorter
+ IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
- grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keys,
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outDesc);
+ grouper = new PreclusteredGroupOperatorDescriptor(spec, keys, new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc);
- grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+ grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
- // Connect sorter with the pre-cluster
- OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(
- spec);
- spec.connect(sortGroupConn, sorter, 0, grouper, 0);
- break;
- case 2: // In-memory hash group
- grouper = new HashGroupOperatorDescriptor(
- spec,
- keys,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outDesc, htSize);
+ // Connect sorter with the pre-cluster
+ OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(sortGroupConn, sorter, 0, grouper, 0);
+ break;
+ case 2: // In-memory hash group
+ grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+ htSize);
- grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+ grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
- // Connect scanner with the grouper
- IConnectorDescriptor scanConn = new MToNHashPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanConn, fileScanner, 0, grouper, 0);
- break;
- default:
- grouper = new ExternalHashGroupOperatorDescriptor(
- spec,
- keys,
- framesLimit,
- false,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- outDesc, htSize);
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanConn, fileScanner, 0, grouper, 0);
+ break;
+ default:
+ grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+ htSize);
- grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
+ grouper.setPartitionConstraint(createPartitionConstraint(outSplits));
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef = new MToNHashPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
- }
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConnDef = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+ }
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
- outSplits);
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
- AbstractSingleActivityOperatorDescriptor writer;
+ AbstractSingleActivityOperatorDescriptor writer;
- if (outPlain)
- writer = new PlainFileWriterOperatorDescriptor(spec,
- outSplitProvider, "|");
- else
- writer = new FrameFileWriterOperatorDescriptor(spec,
- outSplitProvider);
+ if (outPlain)
+ writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+ else
+ writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
- writer.setPartitionConstraint(createPartitionConstraint(outSplits));
+ writer.setPartitionConstraint(createPartitionConstraint(outSplits));
- IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(
- spec);
- spec.connect(groupOutConn, grouper, 0, writer, 0);
+ IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(groupOutConn, grouper, 0, writer, 0);
- spec.addRoot(writer);
- return spec;
- }
+ spec.addRoot(writer);
+ return spec;
+ }
- private static PartitionConstraint createPartitionConstraint(
- FileSplit[] splits) {
- LocationConstraint[] lConstraints = new LocationConstraint[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- lConstraints[i] = new AbsoluteLocationConstraint(
- splits[i].getNodeName());
- }
- return new ExplicitPartitionConstraint(lConstraints);
- }
+ private static PartitionConstraint createPartitionConstraint(FileSplit[] splits) {
+ LocationConstraint[] lConstraints = new LocationConstraint[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ lConstraints[i] = new AbsoluteLocationConstraint(splits[i].getNodeName());
+ }
+ return new ExplicitPartitionConstraint(lConstraints);
+ }
}