Update issue #52: rewrote GroupingHashTable to use aggregator for key fields output; fixed a bug of incorrect initialization in pre-cluster grouper; tested all integration tests for new aggregation interfaces with larger dataset (so external grouper spills). 

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@893 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index 8c6338d..f38eeb3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -73,7 +73,6 @@
     private int accumulatorSize;
 
     private int lastBIndex;
-    private final int[] fields;
     private final int[] storedKeys;
     private final IBinaryComparator[] comparators;
     private final FrameTuplePairComparator ftpc;
@@ -94,7 +93,6 @@
         buffers = new ArrayList<ByteBuffer>();
         table = new Link[tableSize];
 
-        this.fields = fields;
         storedKeys = new int[fields.length];
         @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
@@ -163,20 +161,22 @@
             // Did not find the key. Insert a new entry.
             saIndex = accumulatorSize++;
             // Add keys
-            if (!appender.appendProjection(accessor, tIndex, fields)) {
-                addNewBuffer();
-                if (!appender.appendProjection(accessor, tIndex, fields)) {
-                    throw new IllegalStateException();
-                }
-            }
+
             // Add index to the keys in frame
             int sbIndex = lastBIndex;
-            int stIndex = appender.getTupleCount() - 1;
+            int stIndex = appender.getTupleCount();
             
             // Add aggregation fields
             AggregateState newState = aggregator.createAggregateStates();
             
-            aggregator.init(null, accessor, tIndex, newState);
+            if(!aggregator.init(appender, accessor, tIndex, newState)){
+                addNewBuffer();
+                sbIndex = lastBIndex;
+                stIndex = appender.getTupleCount();
+                if(!aggregator.init(appender, accessor, tIndex, newState)){
+                    throw new IllegalStateException();
+                }
+            }
             
             if (accumulatorSize >= aggregateStates.length) {
                 aggregateStates = Arrays.copyOf(aggregateStates,
@@ -210,7 +210,6 @@
                                     tIndex, aggregateStates[aIndex])) {
                         flushFrame(appender, writer);
                     }
-                    aggregator.reset();
                 }
             }
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
index 70f25d9..f8e10ef 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
@@ -74,7 +74,7 @@
                 } else {
                     switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
                 }
-                aggregator.aggregate(inFrameAccessor, i, null, 0, aggregateState);
+                
             }
         }
         FrameUtils.copy(buffer, copyFrame);
@@ -85,6 +85,8 @@
         if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
             writeOutput(prevTupleAccessor, prevTupleIndex);
             aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+        } else {
+            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
         }
     }
 
@@ -97,7 +99,6 @@
                 throw new IllegalStateException();
             }
         }
-        aggregator.reset();
     }
 
     private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index c3c98cc..6f92cf1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -136,10 +136,12 @@
                     AggregateState state) throws HyracksDataException {
                 if (!outputPending) {
                     resultTupleBuilder.reset();
+                    
                     for (int i = 0; i < keyFields.length; i++) {
                         resultTupleBuilder.addField(accessor, tIndex,
                                 keyFields[i]);
                     }
+ 
                     DataOutput dos = resultTupleBuilder.getDataOutput();
 
                     int tupleOffset = accessor.getTupleStartOffset(tIndex);