Update issue #52: added initial version of PreCluster grouper operator; added test case for PreCluster grouper.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@880 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
new file mode 100644
index 0000000..116992d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class PreclusteredGroupOperatorDescriptor extends
+        AbstractSingleActivityOperatorDescriptor {
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+
+    private static final long serialVersionUID = 1L;
+
+    public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
+            int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor recordDescriptor) {
+        super(spec, 1, 1);
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(
+            final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition,
+            int nPartitions) throws HyracksDataException {
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        final RecordDescriptor inRecordDesc = recordDescProvider
+                .getInputRecordDescriptor(getOperatorId(), 0);
+        final IAggregatorDescriptor aggregator = aggregatorFactory
+                .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
+                        groupFields);
+        final ByteBuffer copyFrame = ctx.allocateFrame();
+        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
+                ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor.reset(copyFrame);
+        ByteBuffer outFrame = ctx.allocateFrame();
+        final FrameTupleAppender appender = new FrameTupleAppender(
+                ctx.getFrameSize());
+        appender.reset(outFrame, true);
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            private PreclusteredGroupWriter pgw;
+
+            @Override
+            public void open() throws HyracksDataException {
+                pgw = new PreclusteredGroupWriter(ctx, groupFields,
+                        comparators, aggregator, inRecordDesc, writer);
+                pgw.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer)
+                    throws HyracksDataException {
+                pgw.nextFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                pgw.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                pgw.close();
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
new file mode 100644
index 0000000..70f25d9
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class PreclusteredGroupWriter implements IFrameWriter {
+    private final int[] groupFields;
+    private final IBinaryComparator[] comparators;
+    private final IAggregatorDescriptor aggregator;
+    private final AggregateState aggregateState;
+    private final IFrameWriter writer;
+    private final ByteBuffer copyFrame;
+    private final FrameTupleAccessor inFrameAccessor;
+    private final FrameTupleAccessor copyFrameAccessor;
+    private final ByteBuffer outFrame;
+    private final FrameTupleAppender appender;
+    private boolean first;
+
+    public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+        this.groupFields = groupFields;
+        this.comparators = comparators;
+        this.aggregator = aggregator;
+        this.aggregateState = aggregator.createAggregateStates();
+        this.writer = writer;
+        copyFrame = ctx.allocateFrame();
+        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor.reset(copyFrame);
+        outFrame = ctx.allocateFrame();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(outFrame, true);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+        first = true;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inFrameAccessor.reset(buffer);
+        int nTuples = inFrameAccessor.getTupleCount();
+        for (int i = 0; i < nTuples; ++i) {
+            if (first) {
+                aggregator.init(null, inFrameAccessor, i, aggregateState);
+                first = false;
+            } else {
+                if (i == 0) {
+                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+                } else {
+                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                }
+                aggregator.aggregate(inFrameAccessor, i, null, 0, aggregateState);
+            }
+        }
+        FrameUtils.copy(buffer, copyFrame);
+    }
+
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+            writeOutput(prevTupleAccessor, prevTupleIndex);
+            aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+        }
+    }
+
+    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+            throws HyracksDataException {
+        if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
+            FrameUtils.flushFrame(appender.getBuffer(), writer);
+            appender.reset(appender.getBuffer(), true);
+            if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
+                throw new IllegalStateException();
+            }
+        }
+        aggregator.reset();
+    }
+
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+        for (int i = 0; i < comparators.length; ++i) {
+            int fIdx = groupFields[i];
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int l1 = a1.getFieldLength(t1Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (!first) {
+            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(appender.getBuffer(), writer);
+            }
+        }
+        writer.close();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index 7fe636a..fcc961f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -186,6 +186,10 @@
                         }
                     }
                 }
+                // For pre-cluster: no output state is needed
+                if(appender == null){
+                    return true;
+                }
                 if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
                         stateTupleBuilder.getByteArray(), 0,
                         stateTupleBuilder.getSize())) {
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 45468e7..6ebf22d 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -42,6 +42,7 @@
 import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
@@ -169,6 +170,57 @@
         spec.addRoot(printer);
         runTest(spec);
     }
+    
+    @Test
+    public void singleKeySumPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
+                outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
     @Test
     public void singleKeySumExtGroupTest() throws Exception {
@@ -286,6 +338,57 @@
         spec.addRoot(printer);
         runTest(spec);
     }
+    
+    @Test
+    public void singleKeyAvgPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new CountFieldAggregatorFactory(true),
+                        new AvgFieldAggregatorFactory(1, true) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
     @Test
     public void singleKeyAvgExtGroupTest() throws Exception {
@@ -407,6 +510,57 @@
     }
 
     @Test
+    public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }),
+                outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    
+    @Test
     public void singleKeyMinMaxStringExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();