Update issue #52: rewrote GroupingHashTable to use aggregator for key fields output; fixed a bug of incorrect initialization in pre-cluster grouper; tested all integration tests for new aggregation interfaces with larger dataset (so external grouper spills).
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@893 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index 8c6338d..f38eeb3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -73,7 +73,6 @@
private int accumulatorSize;
private int lastBIndex;
- private final int[] fields;
private final int[] storedKeys;
private final IBinaryComparator[] comparators;
private final FrameTuplePairComparator ftpc;
@@ -94,7 +93,6 @@
buffers = new ArrayList<ByteBuffer>();
table = new Link[tableSize];
- this.fields = fields;
storedKeys = new int[fields.length];
@SuppressWarnings("rawtypes")
ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
@@ -163,20 +161,22 @@
// Did not find the key. Insert a new entry.
saIndex = accumulatorSize++;
// Add keys
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- addNewBuffer();
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- throw new IllegalStateException();
- }
- }
+
// Add index to the keys in frame
int sbIndex = lastBIndex;
- int stIndex = appender.getTupleCount() - 1;
+ int stIndex = appender.getTupleCount();
// Add aggregation fields
AggregateState newState = aggregator.createAggregateStates();
- aggregator.init(null, accessor, tIndex, newState);
+ if(!aggregator.init(appender, accessor, tIndex, newState)){
+ addNewBuffer();
+ sbIndex = lastBIndex;
+ stIndex = appender.getTupleCount();
+ if(!aggregator.init(appender, accessor, tIndex, newState)){
+ throw new IllegalStateException();
+ }
+ }
if (accumulatorSize >= aggregateStates.length) {
aggregateStates = Arrays.copyOf(aggregateStates,
@@ -210,7 +210,6 @@
tIndex, aggregateStates[aIndex])) {
flushFrame(appender, writer);
}
- aggregator.reset();
}
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
index 70f25d9..f8e10ef 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
@@ -74,7 +74,7 @@
} else {
switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
}
- aggregator.aggregate(inFrameAccessor, i, null, 0, aggregateState);
+
}
}
FrameUtils.copy(buffer, copyFrame);
@@ -85,6 +85,8 @@
if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
writeOutput(prevTupleAccessor, prevTupleIndex);
aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+ } else {
+ aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
}
}
@@ -97,7 +99,6 @@
throw new IllegalStateException();
}
}
- aggregator.reset();
}
private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index c3c98cc..6f92cf1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -136,10 +136,12 @@
AggregateState state) throws HyracksDataException {
if (!outputPending) {
resultTupleBuilder.reset();
+
for (int i = 0; i < keyFields.length; i++) {
resultTupleBuilder.addField(accessor, tIndex,
keyFields[i]);
}
+
DataOutput dos = resultTupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);