Update #52: fixed bugs on external grouper on merging phase; fixed bugs on aggregation integration tests. 

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@883 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index 680e7cb..31d406d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -39,7 +39,6 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -315,7 +314,6 @@
                         ctx.getFrameSize());
                 private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
                         ctx.getFrameSize(), recordDescriptors[0]);
-                private ArrayTupleBuilder finalTupleBuilder;
                 private FrameTupleAppender writerFrameAppender;
 
                 public void initialize() throws HyracksDataException {
@@ -450,11 +448,10 @@
                                  * Initialize the first output record Reset the
                                  * tuple builder
                                  */
-
-                                if (!aggregator.init(outFrameAppender, fta,
+                                if (!aggregator.initFromPartial(outFrameAppender, fta,
                                         tupleIndex, aggregateState)) {
                                     flushOutFrame(writer, finalPass);
-                                    if (!aggregator.init(outFrameAppender, fta,
+                                    if (!aggregator.initFromPartial(outFrameAppender, fta,
                                             tupleIndex, aggregateState)) {
                                         throw new HyracksDataException(
                                                 "Failed to append an aggregation result to the output frame.");
@@ -501,10 +498,6 @@
 
                 private void flushOutFrame(IFrameWriter writer, boolean isFinal)
                         throws HyracksDataException {
-                    if (finalTupleBuilder == null) {
-                        finalTupleBuilder = new ArrayTupleBuilder(
-                                recordDescriptors[0].getFields().length);
-                    }
                     if (writerFrame == null) {
                         writerFrame = ctx.allocateFrame();
                     }
@@ -516,9 +509,6 @@
                     outFrameAccessor.reset(outFrame);
 
                     for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-                        for (int j = 0; j < keyFields.length; j++) {
-                            finalTupleBuilder.addField(outFrameAccessor, i, j);
-                        }
                         
                         if(isFinal){
                             if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
@@ -539,7 +529,6 @@
                                 }
                             }
                         }
-                        aggregator.reset();
                     }
                     if (writerFrameAppender.getTupleCount() > 0) {
                         FrameUtils.flushFrame(writerFrame, writer);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index 28a71ac..8c6338d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -65,6 +65,10 @@
     private final FrameTupleAppender appender;
     private final List<ByteBuffer> buffers;
     private final Link[] table;
+    /**
+     * Aggregate states: a list of states for all groups maintained in the main
+     * memory.
+     */
     private AggregateState[] aggregateStates;
     private int accumulatorSize;
 
@@ -165,12 +169,15 @@
                     throw new IllegalStateException();
                 }
             }
-            // Add aggregation fields
+            // Add index to the keys in frame
             int sbIndex = lastBIndex;
             int stIndex = appender.getTupleCount() - 1;
+            
+            // Add aggregation fields
             AggregateState newState = aggregator.createAggregateStates();
-            aggregator.init(appender, accessor, tIndex, newState);
-
+            
+            aggregator.init(null, accessor, tIndex, newState);
+            
             if (accumulatorSize >= aggregateStates.length) {
                 aggregateStates = Arrays.copyOf(aggregateStates,
                         aggregateStates.length * 2);
@@ -198,8 +205,9 @@
                     ByteBuffer keyBuffer = buffers.get(bIndex);
                     storedKeysAccessor.reset(keyBuffer);
 
-                    while (!aggregator.outputFinalResult(appender,
-                            storedKeysAccessor, tIndex, aggregateStates[aIndex])) {
+                    while (!aggregator
+                            .outputFinalResult(appender, storedKeysAccessor,
+                                    tIndex, aggregateStates[aIndex])) {
                         flushFrame(appender, writer);
                     }
                     aggregator.reset();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index 1299a1e..d13cc5d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -294,7 +294,6 @@
                                     appender.reset(outFrame, true);
                                 }
                             }
-                            aggregator.reset();
                         } while (true);
                     }
                     if (appender.getTupleCount() != 0) {
@@ -344,7 +343,6 @@
                             }
                         }
                     }
-                    aggregator.reset();
                 }
                 if (appender.getTupleCount() > 0) {
                     FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
index 75c546f..3851f26 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
@@ -20,7 +20,12 @@
  *
  */
 public interface IAggregateStateFactory extends Serializable {
-
+    
+    /**
+     * Get the (partial) state length in binary. 
+     * 
+     * @return
+     */
     public int getStateLength();
     
     public Object createState();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
index 176806f..dc2a30e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
@@ -30,6 +30,11 @@
      */
     public AggregateState createAggregateStates();
     
+    /**
+     * Get the length of the binary states.
+     * 
+     * @return
+     */
     public int getAggregateStatesLength();
 
     /**
@@ -47,6 +52,22 @@
     public boolean init(FrameTupleAppender appender,
             IFrameTupleAccessor accessor, int tIndex, AggregateState state)
             throws HyracksDataException;
+    
+    /**
+     * Initialize the state based on the partial results.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param fieldOutput
+     *            The data output for the frame containing the state. This may
+     *            be null, if the state is maintained as a java object
+     * @param state
+     *            The state to be initialized.
+     * @throws HyracksDataException
+     */
+    public boolean initFromPartial(FrameTupleAppender appender,
+            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+            throws HyracksDataException;
 
     /**
      * Reset the aggregator. The corresponding aggregate state should be reset
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
index 50da4cb..c4df536 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -27,13 +27,18 @@
     public IAggregateStateFactory getAggregateStateFactory();
 
     /**
-     * Initialize the state based on the input tuple.
+     * Initialize the state based on the input tuple. 
      * 
      * @param accessor
      * @param tIndex
      * @param fieldOutput
      *            The data output for the frame containing the state. This may
-     *            be null, if the state is maintained as a java object
+     *            be null, if the state is maintained as a java object. 
+     *            
+     *            Note that we have an assumption that the initialization of
+     *            the binary state (if any) inserts the state fields into the
+     *            buffer in a appending fashion. This means that an arbitrary
+     *            initial size of the state can be accquired.
      * @param state
      *            The state to be initialized.
      * @throws HyracksDataException
@@ -43,7 +48,10 @@
             throws HyracksDataException;
     
     /**
-     * Initialize the state based on the input tuple.
+     * Initialize the state by loading the partial results. This is specified
+     * since for some aggregations (like avg), the partial results and final 
+     * results are different, and different initialization methods should be 
+     * used.
      * 
      * @param accessor
      * @param tIndex
@@ -76,7 +84,11 @@
      * @param data
      *            The buffer containing the state, if frame-based-state is used.
      *            This means that it can be null if java-object-based-state is
-     *            used.
+     *            used. 
+     *            
+     *            Here the length of binary state can be obtains from the state
+     *            parameter, and if the content to be filled into that is over-
+     *            flowing (larger than the reversed space), error should be emit.
      * @param offset
      * @param state
      *            The aggregate state.
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
index 7db12f7..0695c5a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
@@ -105,13 +105,16 @@
             public void init(IFrameTupleAccessor accessor, int tIndex,
                     DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
+
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                
                 sum += IntegerSerializerDeserializer.getInt(accessor
                         .getBuffer().array(),
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
+                
                 if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(sum);
@@ -168,6 +171,7 @@
                         .getBuffer().array(),
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
+                
                 if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
                     sum += buf.getInt(offset);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
index b76c285..1ae7aa4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -42,10 +42,11 @@
     private final int aggField;
 
     private final boolean isMax;
-    
+
     private final boolean hasBinaryState;
 
-    public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax, boolean hasBinaryState) {
+    public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax,
+            boolean hasBinaryState) {
         this.aggField = aggField;
         this.isMax = isMax;
         this.hasBinaryState = hasBinaryState;
@@ -76,9 +77,10 @@
                     throws HyracksDataException {
                 try {
                     if (hasBinaryState) {
-                        int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
+                        int stateIdx = IntegerSerializerDeserializer.getInt(
+                                data, offset);
                         Object[] storedState = (Object[]) state.getState();
-                        fieldOutput.writeUTF((String)storedState[stateIdx]);
+                        fieldOutput.writeUTF((String) storedState[stateIdx]);
                     } else {
                         fieldOutput.writeUTF((String) state.getState());
                     }
@@ -94,13 +96,11 @@
                     throws HyracksDataException {
                 try {
                     if (hasBinaryState) {
-                        int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
+                        int stateIdx = IntegerSerializerDeserializer.getInt(
+                                data, offset);
                         Object[] storedState = (Object[]) state.getState();
-                        fieldOutput.writeUTF((String)storedState[stateIdx]);
+                        fieldOutput.writeUTF((String) storedState[stateIdx]);
                     } else {
-                        if(((String)state.getState()).equalsIgnoreCase("ic platelets lose carefully. blithely unu")){
-                            System.out.print("");
-                        }
                         fieldOutput.writeUTF((String) state.getState());
                     }
                 } catch (IOException e) {
@@ -124,11 +124,13 @@
                                         + fieldStart, fieldLength)));
                 if (hasBinaryState) {
                     // Object-binary-state
-                    Object[] storedState = (Object[]) state.getState();
-                    if (storedState == null) {
+                    Object[] storedState;
+                    if (state.getState() == null) {
                         storedState = new Object[8];
                         storedState[0] = new Integer(0);
                         state.setState(storedState);
+                    } else {
+                        storedState = (Object[]) state.getState();
                     }
                     int stateCount = (Integer) (storedState[0]);
                     if (stateCount + 1 >= storedState.length) {
@@ -173,6 +175,7 @@
                 if (hasBinaryState) {
                     int stateIdx = IntegerSerializerDeserializer.getInt(data,
                             offset);
+
                     Object[] storedState = (Object[]) state.getState();
 
                     if (isMax) {
@@ -204,24 +207,24 @@
             @Override
             public IAggregateStateFactory getAggregateStateFactory() {
                 return new IAggregateStateFactory() {
-                    
+
                     private static final long serialVersionUID = 1L;
 
                     @Override
                     public boolean hasObjectState() {
                         return true;
                     }
-                    
+
                     @Override
                     public boolean hasBinaryState() {
                         return hasBinaryState;
                     }
-                    
+
                     @Override
                     public int getStateLength() {
                         return 4;
                     }
-                    
+
                     @Override
                     public Object createState() {
                         return null;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index fcc961f..c3c98cc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -82,7 +82,7 @@
 
         return new IAggregatorDescriptor() {
 
-            private boolean pending;
+            private boolean initPending, outputPending;
 
             @Override
             public void reset() {
@@ -91,14 +91,15 @@
                     aggregateStateFactories[i] = aggregators[i]
                             .getAggregateStateFactory();
                 }
-                pending = false;
+                initPending = false;
+                outputPending = false;
             }
 
             @Override
             public boolean outputPartialResult(FrameTupleAppender appender,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                if (!pending) {
+                if (!outputPending) {
                     resultTupleBuilder.reset();
                     for (int i = 0; i < keyFields.length; i++) {
                         resultTupleBuilder.addField(accessor, tIndex,
@@ -121,9 +122,10 @@
                 if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
                         resultTupleBuilder.getByteArray(), 0,
                         resultTupleBuilder.getSize())) {
-                    pending = true;
+                    outputPending = true;
                     return false;
                 }
+                outputPending = false;
                 return true;
 
             }
@@ -132,7 +134,7 @@
             public boolean outputFinalResult(FrameTupleAppender appender,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                if (!pending) {
+                if (!outputPending) {
                     resultTupleBuilder.reset();
                     for (int i = 0; i < keyFields.length; i++) {
                         resultTupleBuilder.addField(accessor, tIndex,
@@ -160,9 +162,10 @@
                 if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
                         resultTupleBuilder.getByteArray(), 0,
                         resultTupleBuilder.getSize())) {
-                    pending = true;
+                    outputPending = true;
                     return false;
                 }
+                outputPending = false;
                 return true;
             }
 
@@ -170,7 +173,7 @@
             public boolean init(FrameTupleAppender appender,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                if (!pending) {
+                if (!initPending) {
                     stateTupleBuilder.reset();
                     for (int i = 0; i < keyFields.length; i++) {
                         stateTupleBuilder.addField(accessor, tIndex,
@@ -188,14 +191,16 @@
                 }
                 // For pre-cluster: no output state is needed
                 if(appender == null){
+                    initPending = false;
                     return true;
                 }
                 if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
                         stateTupleBuilder.getByteArray(), 0,
                         stateTupleBuilder.getSize())) {
-                    pending = true;
+                    initPending = true;
                     return false;
                 }
+                initPending = false;
                 return true;
             }
 
@@ -261,6 +266,41 @@
                     }
                 }
             }
+
+            @Override
+            public boolean initFromPartial(FrameTupleAppender appender,
+                    IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                if (!initPending) {
+                    stateTupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        stateTupleBuilder.addField(accessor, tIndex,
+                                keyFields[i]);
+                    }
+                    DataOutput dos = stateTupleBuilder.getDataOutput();
+
+                    for (int i = 0; i < aggregators.length; i++) {
+                        aggregators[i].initFromPartial(accessor, tIndex, dos,
+                                ((AggregateState[]) state.getState())[i]);
+                        if (aggregateStateFactories[i].hasBinaryState()) {
+                            stateTupleBuilder.addFieldEndOffset();
+                        }
+                    }
+                }
+                // For pre-cluster: no output state is needed
+                if(appender == null){
+                    initPending = false;
+                    return true;
+                }
+                if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
+                        stateTupleBuilder.getByteArray(), 0,
+                        stateTupleBuilder.getSize())) {
+                    initPending = true;
+                    return false;
+                }
+                initPending = false;
+                return true;
+            }
         };
     }
 }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 6ebf22d..d89709f 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -239,7 +239,7 @@
                         IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 3;
+        int frameLimits = 4;
         int tableSize = 8;
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
@@ -253,7 +253,7 @@
                         new IntSumFieldAggregatorFactory(3, false) }),
                 new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
                         new IntSumFieldAggregatorFactory(1, false),
-                        new IntSumFieldAggregatorFactory(3, false) }),
+                        new IntSumFieldAggregatorFactory(2, false) }),
                 outputRec,
                 new HashSpillableTableFactory(
                         new FieldHashPartitionComputerFactory(
@@ -408,7 +408,7 @@
                         FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 3;
+        int frameLimits = 4;
         int tableSize = 8;
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
@@ -423,8 +423,8 @@
                         new AvgFieldAggregatorFactory(1, false) }),
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
                         new IntSumFieldAggregatorFactory(1, false),
-                        new CountFieldAggregatorFactory(false),
-                        new AvgFieldAggregatorFactory(2, false) }),
+                        new IntSumFieldAggregatorFactory(2, false),
+                        new AvgFieldAggregatorFactory(3, false) }),
                 outputRec,
                 new HashSpillableTableFactory(
                         new FieldHashPartitionComputerFactory(
@@ -577,7 +577,7 @@
                         UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 3;
+        int frameLimits = 4;
         int tableSize = 8;
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(