Update issue #52
Added AVG aggregator using new interface; added two integration tests for AVG aggregator.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@862 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
index 886dd06..704fe4e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
@@ -108,6 +108,7 @@
.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
+ count += 1;
if (fieldOutput != null) {
try {
fieldOutput.writeInt(sum);
@@ -117,7 +118,7 @@
"I/O exception when initializing the aggregator.");
}
} else {
- state.setState(new Object[]{sum, count});
+ state.setState(new Integer[]{sum, count});
}
}
@@ -180,7 +181,7 @@
Integer[] fields = (Integer[])state.getState();
sum += fields[0];
count += fields[1];
- state.setState(new Object[]{sum, count});
+ state.setState(new Integer[]{sum, count});
}
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index aa3f892..b829f5b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -399,6 +399,7 @@
if (currentTupleInOutFrame < 0
|| compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
/**
+ * If a new group comes
* Initialize the first output record
* Reset the tuple builder
*/
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
index af5d16b..b20a93d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -111,7 +111,6 @@
private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);;
private final TuplePointer storedTuplePointer = new TuplePointer();
private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
- private int groupSize = 0;
private IAggregatorDescriptor aggregator = aggregateDescriptorFactory.createAggregator(ctx,
inRecordDescriptor, outRecordDescriptor, keyFields);
@@ -124,7 +123,6 @@
@Override
public void reset() {
- groupSize = 0;
dataFrameCount = -1;
tPointers = null;
table.reset();
@@ -175,7 +173,6 @@
storedTuplePointer.frameIndex = dataFrameCount;
storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
table.insert(entry, storedTuplePointer);
- groupSize++;
} else {
// If there is a matching found, do aggregation directly
int tupleOffset = storedKeysAccessor1.getTupleStartOffset(storedTuplePointer.tupleIndex);
@@ -439,7 +436,6 @@
@Override
public void close() {
- groupSize = 0;
dataFrameCount = -1;
tPointers = null;
table.close();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 9f3c0d6..c1bc9e9 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -42,6 +42,8 @@
import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgMergeAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -226,4 +228,120 @@
runTest(spec);
}
+ @Test
+ public void singleKeyAvgInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
+
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumAggregatorFactory(1),
+ new AvgAggregatorFactory(1) }, outputRec, tableSize);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyAvgExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 3;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumAggregatorFactory(1),
+ new AvgAggregatorFactory(1) },
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumAggregatorFactory(1),
+ new AvgMergeAggregatorFactory(2) },
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+
}