Update issue #52:

Added integration test for new aggregator interface: contains tests for sum aggregation on both in-mem and external group operators.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@861 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
new file mode 100644
index 0000000..9f3c0d6
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+
+/**
+ *
+ */
+public class AggregationTests extends AbstractIntegrationTest {
+
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                    "data/tpch0.001/lineitem.tbl"))) });
+
+    final RecordDescriptor desc = new RecordDescriptor(
+            new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE });
+
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE, }, '|');
+
+    private AbstractSingleActivityOperatorDescriptor getPrinter(
+            JobSpecification spec, String prefix) throws IOException {
+
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+                spec, new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit(NC1_ID, createTempFile()
+                                .getAbsolutePath()),
+                        new FileSplit(NC2_ID, createTempFile()
+                                .getAbsolutePath()) }), "\t");
+
+        return printer;
+    }
+
+    @Test
+    public void singleKeySumInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
+
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new IntSumAggregatorFactory(3) }, outputRec, tableSize);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void singleKeySumExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 3;
+        int tableSize = 8;
+
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new IntSumAggregatorFactory(3) },
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new IntSumAggregatorFactory(3) },
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+}