Update issue #52: added initial version of PreCluster grouper operator; added test case for PreCluster grouper.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@880 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
new file mode 100644
index 0000000..116992d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class PreclusteredGroupOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
+ private final int[] groupFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+
+ private static final long serialVersionUID = 1L;
+
+ public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
+ int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor recordDescriptor) {
+ super(spec, 1, 1);
+ this.groupFields = groupFields;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) throws HyracksDataException {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ final RecordDescriptor inRecordDesc = recordDescProvider
+ .getInputRecordDescriptor(getOperatorId(), 0);
+ final IAggregatorDescriptor aggregator = aggregatorFactory
+ .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
+ groupFields);
+ final ByteBuffer copyFrame = ctx.allocateFrame();
+ final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
+ ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
+ ByteBuffer outFrame = ctx.allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ appender.reset(outFrame, true);
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private PreclusteredGroupWriter pgw;
+
+ @Override
+ public void open() throws HyracksDataException {
+ pgw = new PreclusteredGroupWriter(ctx, groupFields,
+ comparators, aggregator, inRecordDesc, writer);
+ pgw.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ pgw.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ pgw.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ pgw.close();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
new file mode 100644
index 0000000..70f25d9
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class PreclusteredGroupWriter implements IFrameWriter {
+ private final int[] groupFields;
+ private final IBinaryComparator[] comparators;
+ private final IAggregatorDescriptor aggregator;
+ private final AggregateState aggregateState;
+ private final IFrameWriter writer;
+ private final ByteBuffer copyFrame;
+ private final FrameTupleAccessor inFrameAccessor;
+ private final FrameTupleAccessor copyFrameAccessor;
+ private final ByteBuffer outFrame;
+ private final FrameTupleAppender appender;
+ private boolean first;
+
+ public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+ IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+ this.groupFields = groupFields;
+ this.comparators = comparators;
+ this.aggregator = aggregator;
+ this.aggregateState = aggregator.createAggregateStates();
+ this.writer = writer;
+ copyFrame = ctx.allocateFrame();
+ inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
+ outFrame = ctx.allocateFrame();
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(outFrame, true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ first = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inFrameAccessor.reset(buffer);
+ int nTuples = inFrameAccessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
+ aggregator.init(null, inFrameAccessor, i, aggregateState);
+ first = false;
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+ } else {
+ switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ }
+ aggregator.aggregate(inFrameAccessor, i, null, 0, aggregateState);
+ }
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ writeOutput(prevTupleAccessor, prevTupleIndex);
+ aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+ }
+ }
+
+ private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+ throws HyracksDataException {
+ if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
+ throw new IllegalStateException();
+ }
+ }
+ aggregator.reset();
+ }
+
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!first) {
+ writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ }
+ }
+ writer.close();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index 7fe636a..fcc961f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -186,6 +186,10 @@
}
}
}
+ // For pre-cluster: no output state is needed
+ if(appender == null){
+ return true;
+ }
if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
stateTupleBuilder.getByteArray(), 0,
stateTupleBuilder.getSize())) {
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 45468e7..6ebf22d 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -42,6 +42,7 @@
import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
@@ -169,6 +170,57 @@
spec.addRoot(printer);
runTest(spec);
}
+
+ @Test
+ public void singleKeySumPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
@Test
public void singleKeySumExtGroupTest() throws Exception {
@@ -286,6 +338,57 @@
spec.addRoot(printer);
runTest(spec);
}
+
+ @Test
+ public void singleKeyAvgPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldAggregatorFactory(1, true) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
@Test
public void singleKeyAvgExtGroupTest() throws Exception {
@@ -407,6 +510,57 @@
}
@Test
+ public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }),
+ outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+
+ @Test
public void singleKeyMinMaxStringExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();