Update issue #52

Added AVG aggregator using new interface; added two integration tests for AVG aggregator.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@862 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
index 886dd06..704fe4e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
@@ -108,6 +108,7 @@
                         .getBuffer().array(),
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
+                count += 1;
                 if (fieldOutput != null) {
                     try {
                         fieldOutput.writeInt(sum);
@@ -117,7 +118,7 @@
                                 "I/O exception when initializing the aggregator.");
                     }
                 } else {
-                    state.setState(new Object[]{sum, count});
+                    state.setState(new Integer[]{sum, count});
                 }
             }
             
@@ -180,7 +181,7 @@
                     Integer[] fields = (Integer[])state.getState();
                     sum += fields[0];
                     count += fields[1];
-                    state.setState(new Object[]{sum, count});
+                    state.setState(new Integer[]{sum, count});
                 }
             }
         };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index aa3f892..b829f5b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -399,6 +399,7 @@
                             if (currentTupleInOutFrame < 0
                                     || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
                                 /**
+                                 * If a new group comes
                                  * Initialize the first output record
                                  * Reset the tuple builder
                                  */
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
index af5d16b..b20a93d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -111,7 +111,6 @@
             private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);;
             private final TuplePointer storedTuplePointer = new TuplePointer();
             private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
-            private int groupSize = 0;
             private IAggregatorDescriptor aggregator = aggregateDescriptorFactory.createAggregator(ctx,
                     inRecordDescriptor, outRecordDescriptor, keyFields);
 
@@ -124,7 +123,6 @@
 
             @Override
             public void reset() {
-                groupSize = 0;
                 dataFrameCount = -1;
                 tPointers = null;
                 table.reset();
@@ -175,7 +173,6 @@
                     storedTuplePointer.frameIndex = dataFrameCount;
                     storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
                     table.insert(entry, storedTuplePointer);
-                    groupSize++;
                 } else {
                     // If there is a matching found, do aggregation directly
                     int tupleOffset = storedKeysAccessor1.getTupleStartOffset(storedTuplePointer.tupleIndex);
@@ -439,7 +436,6 @@
 
             @Override
             public void close() {
-                groupSize = 0;
                 dataFrameCount = -1;
                 tPointers = null;
                 table.close();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 9f3c0d6..c1bc9e9 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -42,6 +42,8 @@
 import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgMergeAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -226,4 +228,120 @@
         runTest(spec);
     }
 
+    @Test
+    public void singleKeyAvgInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
+
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new AvgAggregatorFactory(1) }, outputRec, tableSize);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void singleKeyAvgExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 3;
+        int tableSize = 8;
+
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new AvgAggregatorFactory(1) },
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new AvgMergeAggregatorFactory(2) },
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    
 }