Major changes:
- Based on Yingyi's review comments, the new interface is written to have the frame output logic out of the aggregators.
- Added methods for better management of the size of aggregation states. Now aggregator developer can decide the memory size to be used, and the groupers will assign the memory/frame spaces based on these information.
- Fixed some bugs in multiple-field aggregator. 

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@971 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
index 275bc56..9bb54db 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
@@ -24,6 +24,14 @@
     private static final long serialVersionUID = 1L;
     
     Object state = null;
+    
+    public AggregateState(){
+        state = null;
+    }
+    
+    public AggregateState(Object obj){
+        state = obj;
+    }
 
     public void setState(Object obj) {
         state = null;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 971af1a..a0a2dd2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -40,7 +40,6 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
@@ -275,13 +274,14 @@
             }
 
             int[] keyFieldsInPartialResults = new int[keyFields.length];
-            for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+            for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
                 keyFieldsInPartialResults[i] = i;
             }
-            
+
             final IAggregatorDescriptor aggregator = mergerFactory
                     .createAggregator(ctx, recordDescriptors[0],
-                            recordDescriptors[0], keyFields, keyFieldsInPartialResults);
+                            recordDescriptors[0], keyFields,
+                            keyFieldsInPartialResults);
             final AggregateState aggregateState = aggregator
                     .createAggregateStates();
 
@@ -303,6 +303,7 @@
                  * Output frame.
                  */
                 private ByteBuffer outFrame, writerFrame;
+                private int outFrameOffset, writerFrameOffset;
 
                 private LinkedList<RunFileReader> runs;
 
@@ -315,11 +316,9 @@
 
                 private int[] currentFrameIndexInRun;
                 private int[] currentRunFrames;
-                private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
-                        ctx.getFrameSize());
+
                 private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
                         ctx.getFrameSize(), recordDescriptors[0]);
-                private FrameTupleAppender writerFrameAppender;
 
                 public void initialize() throws HyracksDataException {
                     aggState = (AggregateActivityState) ctx
@@ -340,7 +339,7 @@
                             runs = new LinkedList<RunFileReader>(runs);
                             inFrames = new ArrayList<ByteBuffer>();
                             outFrame = ctx.allocateFrame();
-                            outFrameAppender.reset(outFrame, true);
+                            outFrameOffset = 0;
                             outFrameAccessor.reset(outFrame);
                             while (runs.size() > 0) {
                                 try {
@@ -427,7 +426,8 @@
                                                 runFileReaders, tupleAccessors,
                                                 topTuples);
                                 } else {
-                                    closeRun(runIndex, runFileReaders, tupleAccessors);
+                                    closeRun(runIndex, runFileReaders,
+                                            tupleAccessors);
                                     break;
                                 }
                             }
@@ -455,16 +455,32 @@
                                  * Initialize the first output record Reset the
                                  * tuple builder
                                  */
-                                if (!aggregator.init(outFrameAppender, fta,
-                                        tupleIndex, aggregateState)) {
+
+                                int stateLength = aggregator
+                                        .getBinaryAggregateStateLength(fta,
+                                                tupleIndex, aggregateState);
+
+                                if (FrameToolsForGroupers.isFrameOverflowing(
+                                        outFrame, stateLength, (outFrameOffset == 0))) {
                                     flushOutFrame(writer, finalPass);
-                                    if (!aggregator.init(outFrameAppender, fta,
-                                            tupleIndex, aggregateState)) {
+                                    if (FrameToolsForGroupers
+                                            .isFrameOverflowing(outFrame,
+                                                    stateLength, (outFrameOffset == 0))) {
                                         throw new HyracksDataException(
-                                                "Failed to append an aggregation result to the output frame.");
+                                                "The partial result is too large to be initialized in a frame.");
                                     }
                                 }
 
+                                aggregator.init(outFrame.array(),
+                                        outFrameOffset, fta, tupleIndex,
+                                        aggregateState);
+
+                                FrameToolsForGroupers
+                                        .updateFrameMetaForNewTuple(outFrame,
+                                                stateLength, (outFrameOffset == 0));
+
+                                outFrameOffset += stateLength;
+
                             } else {
                                 /**
                                  * if new tuple is in the same group of the
@@ -482,12 +498,12 @@
                                     runFileReaders, tupleAccessors, topTuples);
                         }
 
-                        if (outFrameAppender.getTupleCount() > 0) {
+                        if (outFrameOffset > 0) {
                             flushOutFrame(writer, finalPass);
                         }
 
                         aggregator.close();
-                        
+
                         runs.subList(0, runNumber).clear();
                         /**
                          * insert the new run file into the beginning of the run
@@ -507,41 +523,66 @@
                         throws HyracksDataException {
                     if (writerFrame == null) {
                         writerFrame = ctx.allocateFrame();
+                        writerFrameOffset = 0;
                     }
-                    if (writerFrameAppender == null) {
-                        writerFrameAppender = new FrameTupleAppender(
-                                ctx.getFrameSize());
-                        writerFrameAppender.reset(writerFrame, true);
-                    }
+
                     outFrameAccessor.reset(outFrame);
 
                     for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-                        
-                        if(isFinal){
-                            if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+
+                        int outputLen;
+
+                        if (isFinal) {
+
+                            outputLen = aggregator.getFinalOutputLength(
+                                    outFrameAccessor, i, aggregateState);
+
+                            if (FrameToolsForGroupers.isFrameOverflowing(
+                                    writerFrame, outputLen, (writerFrameOffset == 0))) {
                                 FrameUtils.flushFrame(writerFrame, writer);
-                                writerFrameAppender.reset(writerFrame, true);
-                                if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                writerFrameOffset = 0;
+                                if (FrameToolsForGroupers.isFrameOverflowing(
+                                        writerFrame, outputLen, (writerFrameOffset == 0))) {
                                     throw new HyracksDataException(
-                                            "Failed to write final aggregation result to a writer frame!");
+                                            "Final aggregation output is too large to be fit into a frame.");
                                 }
                             }
+
+                            aggregator.outputFinalResult(writerFrame.array(),
+                                    writerFrameOffset, outFrameAccessor, i,
+                                    aggregateState);
+
                         } else {
-                            if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+
+                            outputLen = aggregator.getPartialOutputLength(
+                                    outFrameAccessor, i, aggregateState);
+
+                            if (FrameToolsForGroupers.isFrameOverflowing(
+                                    writerFrame, outputLen, (writerFrameOffset == 0))) {
                                 FrameUtils.flushFrame(writerFrame, writer);
-                                writerFrameAppender.reset(writerFrame, true);
-                                if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                writerFrameOffset = 0;
+                                if (FrameToolsForGroupers.isFrameOverflowing(
+                                        writerFrame, outputLen, (writerFrameOffset == 0))) {
                                     throw new HyracksDataException(
-                                            "Failed to write final aggregation result to a writer frame!");
+                                            "Final aggregation output is too large to be fit into a frame.");
                                 }
                             }
+
+                            aggregator.outputPartialResult(writerFrame.array(),
+                                    writerFrameOffset, outFrameAccessor, i,
+                                    aggregateState);
                         }
+
+                        FrameToolsForGroupers.updateFrameMetaForNewTuple(
+                                writerFrame, outputLen, (writerFrameOffset == 0));
+
+                        writerFrameOffset += outputLen;
                     }
-                    if (writerFrameAppender.getTupleCount() > 0) {
+                    if (writerFrameOffset > 0) {
                         FrameUtils.flushFrame(writerFrame, writer);
-                        writerFrameAppender.reset(writerFrame, true);
+                        writerFrameOffset = 0;
                     }
-                    outFrameAppender.reset(outFrame, true);
+                    outFrameOffset = 0;
                 }
 
                 private void setNextTopTuple(int runIndex, int[] tupleIndices,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
new file mode 100644
index 0000000..ee29c56
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+/**
+ *
+ */
+public class FrameToolsForGroupers {
+
+    public static void writeFields(byte[] buf, int offset, int length,
+            ArrayTupleBuilder tupleBuilder) throws HyracksDataException {
+        writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(),
+                tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
+    }
+
+    public static void writeFields(byte[] buf, int offset, int length,
+            int[] fieldsOffset, byte[] data, int dataOffset, int dataLength)
+            throws HyracksDataException {
+        if (dataLength + 4 * fieldsOffset.length > length) {
+            throw new HyracksDataException(
+                    "Out of buffer bound: try to write too much data ("
+                            + dataLength + ") to the given bound (" + length
+                            + ").");
+        }
+
+        ByteBuffer buffer = ByteBuffer.wrap(buf, offset, length);
+        for (int i = 0; i < fieldsOffset.length; i++) {
+            buffer.putInt(fieldsOffset[i]);
+        }
+        buffer.put(data, dataOffset, dataLength);
+    }
+
+    public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
+            int addedTupleLength) throws HyracksDataException {
+        int currentTupleCount = buffer.getInt(FrameHelper
+                .getTupleCountOffset(buffer.capacity()));
+        int currentTupleEndOffset = buffer
+                .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+                        * currentTupleCount);
+        int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
+
+        // update tuple end offset
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+                * (currentTupleCount + 1), newTupleEndOffset);
+        // Update the tuple count
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
+                currentTupleCount + 1);
+    }
+
+    public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
+            int addedTupleLength, boolean isReset) throws HyracksDataException {
+        int currentTupleCount;
+        int currentTupleEndOffset;
+        if (isReset) {
+            currentTupleCount = 0;
+            currentTupleEndOffset = 0;
+        } else {
+            currentTupleCount = buffer.getInt(FrameHelper
+                    .getTupleCountOffset(buffer.capacity()));
+            currentTupleEndOffset = buffer.getInt(FrameHelper
+                    .getTupleCountOffset(buffer.capacity())
+                    - 4
+                    * currentTupleCount);
+        }
+        int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
+
+        // update tuple end offset
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+                * (currentTupleCount + 1), newTupleEndOffset);
+        // Update the tuple count
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
+                currentTupleCount + 1);
+    }
+
+    public static boolean isFrameOverflowing(ByteBuffer buffer, int length, boolean isReset)
+            throws HyracksDataException {
+        
+        int currentTupleCount = buffer.getInt(FrameHelper
+                .getTupleCountOffset(buffer.capacity()));
+        if(currentTupleCount == 0 || isReset){
+            return length + 4 + 4 > buffer.capacity();
+        }
+        int currentTupleEndOffset = buffer
+                .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+                        * currentTupleCount);
+        return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer
+                .capacity();
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index b1af426..853ae36 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -29,7 +29,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 
 class GroupingHashTable {
@@ -62,7 +61,7 @@
 
     private static final int INIT_AGG_STATE_SIZE = 8;
     private final IHyracksTaskContext ctx;
-    private final FrameTupleAppender appender;
+
     private final List<ByteBuffer> buffers;
     private final Link[] table;
     /**
@@ -79,6 +78,9 @@
     private final ITuplePartitionComputer tpc;
     private final IAggregatorDescriptor aggregator;
 
+    private int bufferOffset;
+    private int tupleCountInBuffer;
+
     private final FrameTupleAccessor storedKeysAccessor;
 
     GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
@@ -89,7 +91,7 @@
             RecordDescriptor outRecordDescriptor, int tableSize)
             throws HyracksDataException {
         this.ctx = ctx;
-        appender = new FrameTupleAppender(ctx.getFrameSize());
+
         buffers = new ArrayList<ByteBuffer>();
         table = new Link[tableSize];
 
@@ -109,12 +111,13 @@
         tpc = tpcf.createPartitioner();
 
         int[] keyFieldsInPartialResults = new int[fields.length];
-        for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
             keyFieldsInPartialResults[i] = i;
         }
-        
+
         this.aggregator = aggregatorFactory.createAggregator(ctx,
-                inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
+                inRecordDescriptor, outRecordDescriptor, fields,
+                keyFieldsInPartialResults);
 
         this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
         accumulatorSize = 0;
@@ -132,19 +135,11 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
         buffers.add(buffer);
-        appender.reset(buffer, true);
+        bufferOffset = 0;
+        tupleCountInBuffer = 0;
         ++lastBIndex;
     }
 
-    private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
-            throws HyracksDataException {
-        ByteBuffer frame = appender.getBuffer();
-        frame.position(0);
-        frame.limit(frame.capacity());
-        writer.nextFrame(appender.getBuffer());
-        appender.reset(appender.getBuffer(), true);
-    }
-
     void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
         int entry = tpc.partition(accessor, tIndex, table.length);
         Link link = table[entry];
@@ -167,22 +162,32 @@
             saIndex = accumulatorSize++;
             // Add keys
 
-            // Add index to the keys in frame
-            int sbIndex = lastBIndex;
-            int stIndex = appender.getTupleCount();
-            
             // Add aggregation fields
             AggregateState newState = aggregator.createAggregateStates();
-            
-            if(!aggregator.init(appender, accessor, tIndex, newState)){
+
+            int initLength = aggregator.getBinaryAggregateStateLength(accessor,
+                    tIndex, newState);
+
+            if (FrameToolsForGroupers.isFrameOverflowing(
+                    buffers.get(lastBIndex), initLength, (bufferOffset == 0))) {
                 addNewBuffer();
-                sbIndex = lastBIndex;
-                stIndex = appender.getTupleCount();
-                if(!aggregator.init(appender, accessor, tIndex, newState)){
-                    throw new IllegalStateException();
+                if (bufferOffset + initLength > ctx.getFrameSize()) {
+                    throw new HyracksDataException(
+                            "Cannot initialize the aggregation state within a frame.");
                 }
             }
-            
+
+            aggregator.init(buffers.get(lastBIndex).array(), bufferOffset,
+                    accessor, tIndex, newState);
+
+            FrameToolsForGroupers.updateFrameMetaForNewTuple(
+                    buffers.get(lastBIndex), initLength, (bufferOffset == 0));
+
+            bufferOffset += initLength;
+
+            // Update tuple count in frame
+            tupleCountInBuffer++;
+
             if (accumulatorSize >= aggregateStates.length) {
                 aggregateStates = Arrays.copyOf(aggregateStates,
                         aggregateStates.length * 2);
@@ -190,7 +195,8 @@
 
             aggregateStates[saIndex] = newState;
 
-            link.add(sbIndex, stIndex, saIndex);
+            link.add(lastBIndex, tupleCountInBuffer - 1, saIndex);
+
         } else {
             aggregator.aggregate(accessor, tIndex, null, 0,
                     aggregateStates[saIndex]);
@@ -199,7 +205,8 @@
 
     void write(IFrameWriter writer) throws HyracksDataException {
         ByteBuffer buffer = ctx.allocateFrame();
-        appender.reset(buffer, true);
+        int bufOffset = 0;
+
         for (int i = 0; i < table.length; ++i) {
             Link link = table[i];
             if (link != null) {
@@ -210,21 +217,42 @@
                     ByteBuffer keyBuffer = buffers.get(bIndex);
                     storedKeysAccessor.reset(keyBuffer);
 
-                    while (!aggregator
-                            .outputFinalResult(appender, storedKeysAccessor,
-                                    tIndex, aggregateStates[aIndex])) {
-                        flushFrame(appender, writer);
+                    int outputLen = aggregator
+                            .getFinalOutputLength(storedKeysAccessor, tIndex,
+                                    aggregateStates[aIndex]);
+
+                    if (FrameToolsForGroupers.isFrameOverflowing(buffer,
+                            outputLen, (bufOffset == 0))) {
+                        writer.nextFrame(buffer);
+                        bufOffset = 0;
+                        if (FrameToolsForGroupers.isFrameOverflowing(buffer,
+                                outputLen, (bufOffset == 0))) {
+                            throw new HyracksDataException(
+                                    "Cannot write aggregation output in a frame.");
+                        }
                     }
+
+                    aggregator
+                            .outputFinalResult(buffer.array(), bufOffset,
+                                    storedKeysAccessor, tIndex,
+                                    aggregateStates[aIndex]);
+
+                    FrameToolsForGroupers.updateFrameMetaForNewTuple(buffer,
+                            outputLen, (bufOffset == 0));
+
+                    bufOffset += outputLen;
+
                 }
             }
         }
-        if (appender.getTupleCount() != 0) {
-            flushFrame(appender, writer);
+        if (bufOffset != 0) {
+            writer.nextFrame(buffer);
+            bufOffset = 0;
         }
     }
-    
+
     void close() throws HyracksDataException {
-        for(AggregateState aState : aggregateStates){
+        for (AggregateState aState : aggregateStates) {
             aState.close();
         }
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index af68afb..5cff298 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -31,7 +31,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -116,19 +115,17 @@
                 keyFields, storedKeys, comparators);
         final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
                 storedKeys, storedKeys, comparators);
-        final FrameTupleAppender appender = new FrameTupleAppender(
-                ctx.getFrameSize());
+
         final ITuplePartitionComputer tpc = tpcf.createPartitioner();
-        final ByteBuffer outFrame = ctx.allocateFrame();
 
         final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
                 : firstKeyNormalizerFactory.createNormalizedKeyComputer();
 
         int[] keyFieldsInPartialResults = new int[keyFields.length];
-        for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
             keyFieldsInPartialResults[i] = i;
         }
-        
+
         final IAggregatorDescriptor aggregator = aggregateFactory
                 .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
                         keyFields, keyFieldsInPartialResults);
@@ -138,7 +135,12 @@
 
         return new ISpillableTable() {
 
-            private int dataFrameCount;
+            private int lastBufIndex;
+            private int outFrameOffset;
+            private int tupleCountInOutFrame;
+
+            private ByteBuffer outputFrame;
+
             private final ISerializableTable table = new SerializableHashTable(
                     tableSize, ctx);
             private final TuplePointer storedTuplePointer = new TuplePointer();
@@ -197,7 +199,7 @@
 
             @Override
             public void reset() {
-                dataFrameCount = -1;
+                lastBufIndex = -1;
                 tPointers = null;
                 table.reset();
                 aggregator.reset();
@@ -206,7 +208,7 @@
             @Override
             public boolean insert(FrameTupleAccessor accessor, int tIndex)
                     throws HyracksDataException {
-                if (dataFrameCount < 0)
+                if (lastBufIndex < 0)
                     nextAvailableFrame();
                 int entry = tpc.partition(accessor, tIndex, tableSize);
                 boolean foundGroup = false;
@@ -227,21 +229,27 @@
 
                 if (!foundGroup) {
 
-                    if (!aggregator.init(appender, accessor, tIndex,
-                            aggregateState)) {
+                    int initLen = aggregator.getBinaryAggregateStateLength(
+                            accessor, tIndex, aggregateState);
+
+                    if (FrameToolsForGroupers.isFrameOverflowing(
+                            frames.get(lastBufIndex), initLen, (outFrameOffset == 0))) {
                         if (!nextAvailableFrame()) {
                             return false;
-                        } else {
-                            if (!aggregator.init(appender, accessor, tIndex,
-                                    aggregateState)) {
-                                throw new HyracksDataException(
-                                        "Failed to init an aggregator");
-                            }
                         }
                     }
 
-                    storedTuplePointer.frameIndex = dataFrameCount;
-                    storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
+                    aggregator.init(frames.get(lastBufIndex).array(),
+                            outFrameOffset, accessor, tIndex, aggregateState);
+
+                    FrameToolsForGroupers.updateFrameMetaForNewTuple(
+                            frames.get(lastBufIndex), initLen, (outFrameOffset == 0));
+
+                    outFrameOffset += initLen;
+                    tupleCountInOutFrame++;
+
+                    storedTuplePointer.frameIndex = lastBufIndex;
+                    storedTuplePointer.tupleIndex = tupleCountInOutFrame - 1;
                     table.insert(entry, storedTuplePointer);
                 } else {
 
@@ -259,16 +267,19 @@
 
             @Override
             public int getFrameCount() {
-                return dataFrameCount;
+                return lastBufIndex;
             }
 
             @Override
             public void flushFrames(IFrameWriter writer, boolean isPartial)
                     throws HyracksDataException {
-                FrameTupleAppender appender = new FrameTupleAppender(
-                        ctx.getFrameSize());
+                if (outputFrame == null) {
+                    outputFrame = ctx.allocateFrame();
+                }
+
+                int outputFrameOffset = 0;
                 writer.open();
-                appender.reset(outFrame, true);
+
                 if (tPointers == null) {
                     // Not sorted
                     for (int i = 0; i < tableSize; ++i) {
@@ -284,25 +295,64 @@
 
                             storedKeysAccessor1.reset(frames.get(bIndex));
 
+                            int outputLen;
+
                             if (isPartial) {
-                                while (!aggregator.outputPartialResult(
-                                        appender, storedKeysAccessor1, tIndex,
-                                        aggregateState)) {
-                                    FrameUtils.flushFrame(outFrame, writer);
-                                    appender.reset(outFrame, true);
-                                }
-                            } else {
-                                while (!aggregator.outputFinalResult(appender,
+
+                                outputLen = aggregator.getPartialOutputLength(
                                         storedKeysAccessor1, tIndex,
-                                        aggregateState)) {
-                                    FrameUtils.flushFrame(outFrame, writer);
-                                    appender.reset(outFrame, true);
+                                        aggregateState);
+
+                                if (FrameToolsForGroupers.isFrameOverflowing(
+                                        outputFrame, outputLen, (outputFrameOffset == 0))) {
+                                    FrameUtils.flushFrame(outputFrame, writer);
+                                    outputFrameOffset = 0;
+                                    if (FrameToolsForGroupers
+                                            .isFrameOverflowing(outputFrame,
+                                                    outputLen, (outputFrameOffset == 0))) {
+                                        throw new HyracksDataException(
+                                                "The output item is too large to be fit into a frame.");
+                                    }
                                 }
+
+                                aggregator.outputPartialResult(
+                                        outputFrame.array(), outputFrameOffset,
+                                        storedKeysAccessor1, tIndex,
+                                        aggregateState);
+
+                            } else {
+                                outputLen = aggregator.getFinalOutputLength(
+                                        storedKeysAccessor1, tIndex,
+                                        aggregateState);
+
+                                if (FrameToolsForGroupers.isFrameOverflowing(
+                                        outputFrame, outputLen, (outputFrameOffset == 0))) {
+                                    FrameUtils.flushFrame(outputFrame, writer);
+                                    outputFrameOffset = 0;
+                                    if (FrameToolsForGroupers
+                                            .isFrameOverflowing(outputFrame,
+                                                    outputLen, (outputFrameOffset == 0))) {
+                                        throw new HyracksDataException(
+                                                "The output item is too large to be fit into a frame.");
+                                    }
+                                }
+
+                                aggregator.outputFinalResult(
+                                        outputFrame.array(), outputFrameOffset,
+                                        storedKeysAccessor1, tIndex,
+                                        aggregateState);
                             }
+
+                            FrameToolsForGroupers.updateFrameMetaForNewTuple(
+                                    outputFrame, outputLen, (outputFrameOffset == 0));
+
+                            outputFrameOffset += outputLen;
+
                         } while (true);
                     }
-                    if (appender.getTupleCount() != 0) {
-                        FrameUtils.flushFrame(outFrame, writer);
+                    if (outputFrameOffset > 0) {
+                        FrameUtils.flushFrame(outputFrame, writer);
+                        outputFrameOffset = 0;
                     }
                     aggregator.close();
                     return;
@@ -319,45 +369,65 @@
                     ByteBuffer buffer = frames.get(frameIndex);
                     storedKeysAccessor1.reset(buffer);
 
+                    int outputLen;
+
                     if (isPartial) {
-                        if (!aggregator
-                                .outputPartialResult(appender,
-                                        storedKeysAccessor1, tupleIndex,
-                                        aggregateState)) {
-                            FrameUtils.flushFrame(outFrame, writer);
-                            appender.reset(outFrame, true);
-                            if (!aggregator.outputPartialResult(appender,
-                                    storedKeysAccessor1, tupleIndex,
-                                    aggregateState)) {
+
+                        outputLen = aggregator
+                                .getPartialOutputLength(storedKeysAccessor1,
+                                        tupleIndex, aggregateState);
+
+                        if (FrameToolsForGroupers.isFrameOverflowing(
+                                outputFrame, outputLen, (outputFrameOffset == 0))) {
+                            FrameUtils.flushFrame(outputFrame, writer);
+                            outputFrameOffset = 0;
+                            if (FrameToolsForGroupers.isFrameOverflowing(
+                                    outputFrame, outputLen, (outputFrameOffset == 0))) {
                                 throw new HyracksDataException(
-                                        "Failed to output partial result.");
+                                        "The output item is too large to be fit into a frame.");
                             }
                         }
+
+                        aggregator.outputPartialResult(outputFrame.array(),
+                                outputFrameOffset, storedKeysAccessor1,
+                                tupleIndex, aggregateState);
+
                     } else {
-                        if (!aggregator
-                                .outputFinalResult(appender,
-                                        storedKeysAccessor1, tupleIndex,
-                                        aggregateState)) {
-                            FrameUtils.flushFrame(outFrame, writer);
-                            appender.reset(outFrame, true);
-                            if (!aggregator.outputFinalResult(appender,
-                                    storedKeysAccessor1, tupleIndex,
-                                    aggregateState)) {
+                        outputLen = aggregator
+                                .getFinalOutputLength(storedKeysAccessor1,
+                                        tupleIndex, aggregateState);
+
+                        if (FrameToolsForGroupers.isFrameOverflowing(
+                                outputFrame, outputLen, (outputFrameOffset == 0))) {
+                            FrameUtils.flushFrame(outputFrame, writer);
+                            outputFrameOffset = 0;
+                            if (FrameToolsForGroupers.isFrameOverflowing(
+                                    outputFrame, outputLen, (outputFrameOffset == 0))) {
                                 throw new HyracksDataException(
-                                        "Failed to output partial result.");
+                                        "The output item is too large to be fit into a frame.");
                             }
                         }
+
+                        aggregator.outputFinalResult(outputFrame.array(),
+                                outputFrameOffset, storedKeysAccessor1,
+                                tupleIndex, aggregateState);
                     }
+
+                    FrameToolsForGroupers.updateFrameMetaForNewTuple(
+                            outputFrame, outputLen, (outputFrameOffset == 0));
+
+                    outputFrameOffset += outputLen;
                 }
-                if (appender.getTupleCount() > 0) {
-                    FrameUtils.flushFrame(outFrame, writer);
+                if (outputFrameOffset > 0) {
+                    FrameUtils.flushFrame(outputFrame, writer);
+                    outputFrameOffset = 0;
                 }
                 aggregator.close();
             }
 
             @Override
             public void close() {
-                dataFrameCount = -1;
+                lastBufIndex = -1;
                 tPointers = null;
                 table.close();
                 frames.clear();
@@ -374,7 +444,7 @@
              */
             private boolean nextAvailableFrame() {
                 // Return false if the number of frames is equal to the limit.
-                if (dataFrameCount + 1 >= framesLimit)
+                if (lastBufIndex + 1 >= framesLimit)
                     return false;
 
                 if (frames.size() < framesLimit) {
@@ -383,15 +453,17 @@
                     frame.position(0);
                     frame.limit(frame.capacity());
                     frames.add(frame);
-                    appender.reset(frame, true);
-                    dataFrameCount = frames.size() - 1;
+                    outFrameOffset = 0;
+                    tupleCountInOutFrame = 0;
+                    lastBufIndex = frames.size() - 1;
                 } else {
                     // Reuse an old frame
-                    dataFrameCount++;
-                    ByteBuffer frame = frames.get(dataFrameCount);
+                    lastBufIndex++;
+                    ByteBuffer frame = frames.get(lastBufIndex);
                     frame.position(0);
                     frame.limit(frame.capacity());
-                    appender.reset(frame, true);
+                    outFrameOffset = 0;
+                    tupleCountInOutFrame = 0;
                 }
                 return true;
             }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
deleted file mode 100644
index 858c760..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.io.Serializable;
-
-/**
- *
- */
-public interface IAggregateStateFactory extends Serializable {
-    
-    /**
-     * Get the (partial) state length in binary. 
-     * 
-     * @return
-     */
-    public int getStateLength();
-    
-    public Object createState();
-    
-    public boolean hasBinaryState();
-    
-    public boolean hasObjectState();
-    
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index f3cf2e7..358bf42 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -16,7 +16,6 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 
 /**
  *
@@ -29,13 +28,20 @@
      * @return
      */
     public AggregateState createAggregateStates();
-    
+
     /**
      * Get the length of the binary states.
      * 
      * @return
      */
-    public int getAggregateStatesLength();
+    public int getBinaryAggregateStateLength(IFrameTupleAccessor accessor,
+            int tIndex, AggregateState state) throws HyracksDataException;
+
+    public int getPartialOutputLength(IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException;
+
+    public int getFinalOutputLength(IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException;
 
     /**
      * Initialize the state based on the input tuple.
@@ -49,9 +55,8 @@
      *            The state to be initialized.
      * @throws HyracksDataException
      */
-    public boolean init(FrameTupleAppender appender,
-            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-            throws HyracksDataException;
+    public void init(byte[] buf, int offset, IFrameTupleAccessor accessor,
+            int tIndex, AggregateState state) throws HyracksDataException;
 
     /**
      * Reset the aggregator. The corresponding aggregate state should be reset
@@ -93,7 +98,7 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public boolean outputPartialResult(FrameTupleAppender appender,
+    public void outputPartialResult(byte[] data, int offset,
             IFrameTupleAccessor accessor, int tIndex, AggregateState state)
             throws HyracksDataException;
 
@@ -109,7 +114,7 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public boolean outputFinalResult(FrameTupleAppender appender,
+    public void outputFinalResult(byte[] data, int offset,
             IFrameTupleAccessor accessor, int tIndex, AggregateState state)
             throws HyracksDataException;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index 3cf3d34..1471318 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -23,22 +23,20 @@
  *
  */
 public interface IFieldAggregateDescriptor {
-    
-    public IAggregateStateFactory getAggregateStateFactory();
 
     /**
-     * Initialize the state based on the input tuple. 
+     * Initialize the state based on the input tuple.
      * 
      * @param accessor
      * @param tIndex
      * @param fieldOutput
      *            The data output for the frame containing the state. This may
-     *            be null, if the state is maintained as a java object. 
-     *            
-     *            Note that we have an assumption that the initialization of
-     *            the binary state (if any) inserts the state fields into the
-     *            buffer in a appending fashion. This means that an arbitrary
-     *            initial size of the state can be accquired.
+     *            be null, if the state is maintained as a java object.
+     * 
+     *            Note that we have an assumption that the initialization of the
+     *            binary state (if any) inserts the state fields into the buffer
+     *            in a appending fashion. This means that an arbitrary initial
+     *            size of the state can be accquired.
      * @param state
      *            The state to be initialized.
      * @throws HyracksDataException
@@ -65,11 +63,12 @@
      * @param data
      *            The buffer containing the state, if frame-based-state is used.
      *            This means that it can be null if java-object-based-state is
-     *            used. 
-     *            
+     *            used.
+     * 
      *            Here the length of binary state can be obtains from the state
      *            parameter, and if the content to be filled into that is over-
-     *            flowing (larger than the reversed space), error should be emit.
+     *            flowing (larger than the reversed space), error should be
+     *            emit.
      * @param offset
      * @param state
      *            The aggregate state.
@@ -109,6 +108,24 @@
     public void outputFinalResult(DataOutput fieldOutput, byte[] data,
             int offset, AggregateState state) throws HyracksDataException;
 
+    public boolean needsBinaryState();
+
+    public boolean needsObjectState();
+
+    public int getBinaryStateLength(IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException;
+
+    public int getPartialResultLength(byte[] data, int offset,
+            AggregateState state) throws HyracksDataException;
+
+    public int getFinalResultLength(byte[] data, int offset,
+            AggregateState state) throws HyracksDataException;
+
+    public AggregateState createState();
+
+    /**
+     * Close the field aggregator
+     */
     public void close();
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index 3f2efaf..17f62f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class PreclusteredGroupWriter implements IFrameWriter {
@@ -34,24 +33,29 @@
     private final ByteBuffer copyFrame;
     private final FrameTupleAccessor inFrameAccessor;
     private final FrameTupleAccessor copyFrameAccessor;
+
     private final ByteBuffer outFrame;
-    private final FrameTupleAppender appender;
+    private int outFrameOffset;
+
     private boolean first;
 
-    public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
-            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+    public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
+            IBinaryComparator[] comparators, IAggregatorDescriptor aggregator,
+            RecordDescriptor inRecordDesc, IFrameWriter writer) {
         this.groupFields = groupFields;
         this.comparators = comparators;
         this.aggregator = aggregator;
         this.aggregateState = aggregator.createAggregateStates();
         this.writer = writer;
         copyFrame = ctx.allocateFrame();
-        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                inRecordDesc);
+        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                inRecordDesc);
         copyFrameAccessor.reset(copyFrame);
+
         outFrame = ctx.allocateFrame();
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        appender.reset(outFrame, true);
+        outFrameOffset = 0;
     }
 
     @Override
@@ -66,49 +70,78 @@
         int nTuples = inFrameAccessor.getTupleCount();
         for (int i = 0; i < nTuples; ++i) {
             if (first) {
-                aggregator.init(null, inFrameAccessor, i, aggregateState);
+
+                aggregator.init(null, 0, inFrameAccessor, i, aggregateState);
+
                 first = false;
+
             } else {
                 if (i == 0) {
-                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+                    switchGroupIfRequired(copyFrameAccessor,
+                            copyFrameAccessor.getTupleCount() - 1,
+                            inFrameAccessor, i);
                 } else {
-                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                    switchGroupIfRequired(inFrameAccessor, i - 1,
+                            inFrameAccessor, i);
                 }
-                
+
             }
         }
         FrameUtils.copy(buffer, copyFrame);
     }
 
-    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
-            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
-        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor,
+            int prevTupleIndex, FrameTupleAccessor currTupleAccessor,
+            int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor,
+                currTupleIndex)) {
             writeOutput(prevTupleAccessor, prevTupleIndex);
-            aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+
+            aggregator.init(null, 0, currTupleAccessor, currTupleIndex,
+                    aggregateState);
         } else {
-            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0,
+                    aggregateState);
         }
     }
 
-    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
-            throws HyracksDataException {
-        if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
-            FrameUtils.flushFrame(appender.getBuffer(), writer);
-            appender.reset(appender.getBuffer(), true);
-            if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
-                throw new IllegalStateException();
+    private void writeOutput(final FrameTupleAccessor lastTupleAccessor,
+            int lastTupleIndex) throws HyracksDataException {
+
+        int outLen = aggregator.getFinalOutputLength(lastTupleAccessor,
+                lastTupleIndex, aggregateState);
+
+        if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+            FrameUtils.flushFrame(outFrame, writer);
+            outFrameOffset = 0;
+            if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+                throw new HyracksDataException(
+                        "The output cannot be fit into a frame.");
             }
         }
+
+        aggregator.outputFinalResult(outFrame.array(), outFrameOffset,
+                lastTupleAccessor, lastTupleIndex, aggregateState);
+
+        FrameToolsForGroupers.updateFrameMetaForNewTuple(outFrame, outLen,
+                (outFrameOffset == 0));
+
+        outFrameOffset += outLen;
+
     }
 
-    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx,
+            FrameTupleAccessor a2, int t2Idx) {
         for (int i = 0; i < comparators.length; ++i) {
             int fIdx = groupFields[i];
-            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
+                    + a1.getFieldStartOffset(t1Idx, fIdx);
             int l1 = a1.getFieldLength(t1Idx, fIdx);
-            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
+                    + a2.getFieldStartOffset(t2Idx, fIdx);
             int l2 = a2.getFieldLength(t2Idx, fIdx);
-            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2
+                    .getBuffer().array(), s2, l2) != 0) {
                 return false;
             }
         }
@@ -123,9 +156,10 @@
     @Override
     public void close() throws HyracksDataException {
         if (!first) {
-            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(appender.getBuffer(), writer);
+            writeOutput(copyFrameAccessor,
+                    copyFrameAccessor.getTupleCount() - 1);
+            if (outFrameOffset > 0) {
+                FrameUtils.flushFrame(outFrame, writer);
             }
         }
         aggregateState.close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index 32a7f49..f68f683 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -24,7 +24,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
@@ -158,32 +157,43 @@
             }
 
             @Override
-            public IAggregateStateFactory getAggregateStateFactory() {
-                return new IAggregateStateFactory() {
-                    
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public boolean hasObjectState() {
-                        return useObjectState;
-                    }
-                    
-                    @Override
-                    public boolean hasBinaryState() {
-                        return !useObjectState;
-                    }
-                    
-                    @Override
-                    public int getStateLength() {
-                        return 8;
-                    }
-                    
-                    @Override
-                    public Object createState() {
-                        return new Integer[]{0, 0};
-                    }
-                };
+            public boolean needsObjectState() {
+                return useObjectState;
             }
+            
+            @Override
+            public boolean needsBinaryState() {
+                return !useObjectState;
+            }
+            
+            @Override
+            public AggregateState createState() {
+                return new AggregateState(new Integer[]{0, 0});
+            }
+
+            @Override
+            public int getBinaryStateLength(IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                if(useObjectState){
+                    return 0;
+                } else {
+                    return 8;
+                }
+            }
+
+            @Override
+            public int getPartialResultLength(byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
+                return 8;
+            }
+
+            @Override
+            public int getFinalResultLength(byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
+                return 4;
+            }
+
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index 8f43dec..f463119 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -24,7 +24,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
@@ -133,34 +132,44 @@
             }
 
             @Override
-            public IAggregateStateFactory getAggregateStateFactory() {
-                return new IAggregateStateFactory() {
-                    
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public boolean hasObjectState() {
-                        return useObjectState;
-                    }
-                    
-                    @Override
-                    public boolean hasBinaryState() {
-                        return !useObjectState;
-                    }
-                    
-                    @Override
-                    public int getStateLength() {
-                        return 8;
-                    }
-                    
-                    @Override
-                    public Object createState() {
-                        return new Integer[]{0, 0};
-                    }
-                };
+            public boolean needsObjectState() {
+                return useObjectState;
+            }
+            
+            @Override
+            public boolean needsBinaryState() {
+                return !useObjectState;
             }
 
             @Override
+            public int getBinaryStateLength(IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                if(useObjectState){
+                    return 0;
+                } else {
+                    return 8;
+                }
+            }
+
+            @Override
+            public int getPartialResultLength(byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
+                return 8;
+            }
+
+            @Override
+            public int getFinalResultLength(byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
+                return 4;
+            }
+            
+            @Override
+            public AggregateState createState() {
+                return new AggregateState(new Integer[]{0, 0});
+            }
+            
+            @Override
             public void init(IFrameTupleAccessor accessor,
                     int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index 430284a..a4214a7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -24,7 +24,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
@@ -35,29 +34,36 @@
         IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
-    
+
     private final boolean useObjectState;
-    
-    public CountFieldAggregatorFactory(boolean useObjectState){
+
+    public CountFieldAggregatorFactory(boolean useObjectState) {
         this.useObjectState = useObjectState;
     }
 
-    /* (non-Javadoc)
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+     * IFieldAggregateDescriptorFactory
+     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
     @Override
     public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
             RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
         return new IFieldAggregateDescriptor() {
-            
+
             @Override
             public void reset() {
             }
-            
+
             @Override
-            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+            public void outputPartialResult(DataOutput fieldOutput,
+                    byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
                 int count;
                 if (!useObjectState) {
                     count = IntegerSerializerDeserializer.getInt(data, offset);
@@ -71,10 +77,11 @@
                             "I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
             public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+                    int offset, AggregateState state)
+                    throws HyracksDataException {
                 int count;
                 if (!useObjectState) {
                     count = IntegerSerializerDeserializer.getInt(data, offset);
@@ -88,7 +95,7 @@
                             "I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
             public void init(IFrameTupleAccessor accessor, int tIndex,
                     DataOutput fieldOutput, AggregateState state)
@@ -105,40 +112,24 @@
                     state.setState(count);
                 }
             }
-            
-            @Override
-            public IAggregateStateFactory getAggregateStateFactory() {
-                return new IAggregateStateFactory() {
-                    
-                    private static final long serialVersionUID = 1L;
 
-                    @Override
-                    public boolean hasObjectState() {
-                        return useObjectState;
-                    }
-                    
-                    @Override
-                    public boolean hasBinaryState() {
-                        return !useObjectState;
-                    }
-                    
-                    @Override
-                    public int getStateLength() {
-                        return 4;
-                    }
-                    
-                    @Override
-                    public Object createState() {
-                        return new Integer(0);
-                    }
-                };
+            public boolean needsObjectState() {
+                return useObjectState;
             }
-            
+
+            public boolean needsBinaryState() {
+                return !useObjectState;
+            }
+
+            public AggregateState createState() {
+                return new AggregateState(new Integer(0));
+            }
+
             @Override
             public void close() {
-                
+
             }
-            
+
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex,
                     byte[] data, int offset, AggregateState state)
@@ -153,6 +144,26 @@
                     state.setState(count);
                 }
             }
+
+            @Override
+            public int getBinaryStateLength(IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) {
+                if(useObjectState)
+                    return 0;
+                return 4;
+            }
+
+            @Override
+            public int getPartialResultLength(byte[] data, int offset,
+                    AggregateState state) {
+                return 4;
+            }
+
+            @Override
+            public int getFinalResultLength(byte[] data, int offset,
+                    AggregateState state) {
+                return 4;
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 9c5062f..43de9c6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -24,7 +24,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
@@ -37,7 +36,7 @@
     private static final long serialVersionUID = 1L;
 
     private final int aggField;
-    
+
     private final boolean useObjectState;
 
     public IntSumFieldAggregatorFactory(int aggField, boolean useObjState) {
@@ -109,12 +108,12 @@
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                
+
                 sum += IntegerSerializerDeserializer.getInt(accessor
                         .getBuffer().array(),
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
-                
+
                 if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(sum);
@@ -127,32 +126,16 @@
                 }
             }
 
-            @Override
-            public IAggregateStateFactory getAggregateStateFactory(){
-                return new IAggregateStateFactory() {
-                    
-                    private static final long serialVersionUID = 1L;
+            public boolean needsObjectState() {
+                return useObjectState;
+            }
 
-                    @Override
-                    public boolean hasObjectState() {
-                        return useObjectState;
-                    }
-                    
-                    @Override
-                    public boolean hasBinaryState() {
-                        return !useObjectState;
-                    }
-                    
-                    @Override
-                    public int getStateLength() {
-                        return 4;
-                    }
-                    
-                    @Override
-                    public Object createState() {
-                        return new Integer(0);
-                    }
-                };
+            public boolean needsBinaryState() {
+                return !useObjectState;
+            }
+
+            public AggregateState createState() {
+                return new AggregateState(new Integer(0));
             }
 
             @Override
@@ -171,7 +154,7 @@
                         .getBuffer().array(),
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
-                
+
                 if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
                     sum += buf.getInt(offset);
@@ -181,6 +164,26 @@
                     state.setState(sum);
                 }
             }
+
+            @Override
+            public int getBinaryStateLength(IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) {
+                if (useObjectState)
+                    return 0;
+                return 4;
+            }
+
+            @Override
+            public int getPartialResultLength(byte[] data, int offset,
+                    AggregateState state) {
+                return 4;
+            }
+
+            @Override
+            public int getFinalResultLength(byte[] data, int offset,
+                    AggregateState state) {
+                return 4;
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 9152093..35aa967 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
@@ -204,34 +203,92 @@
                 }
             }
 
-            @Override
-            public IAggregateStateFactory getAggregateStateFactory() {
-                return new IAggregateStateFactory() {
-
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public boolean hasObjectState() {
-                        return true;
-                    }
-
-                    @Override
-                    public boolean hasBinaryState() {
-                        return hasBinaryState;
-                    }
-
-                    @Override
-                    public int getStateLength() {
-                        return 4;
-                    }
-
-                    @Override
-                    public Object createState() {
-                        return null;
-                    }
-                };
+            public boolean needsObjectState() {
+                return true;
             }
+
+            public boolean needsBinaryState() {
+                return hasBinaryState;
+            }
+
+            public AggregateState createState() {
+                return new AggregateState();
+            }
+
+            @Override
+            public int getBinaryStateLength(IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) {
+                int len = 0;
+
+                if (hasBinaryState) {
+                    len = 4;
+                }
+
+                return len;
+            }
+
+            @Override
+            public int getPartialResultLength(byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
+                int len = 0;
+
+                if (hasBinaryState) {
+                    int stateIdx = IntegerSerializerDeserializer.getInt(data,
+                            offset);
+
+                    Object[] storedState = (Object[]) state.getState();
+
+                    len = getUTFLength(((String) (storedState[stateIdx])));
+                } else {
+                    len = getUTFLength((String) (state.getState()));
+                }
+
+                return len;
+            }
+
+            @Override
+            public int getFinalResultLength(byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
+                int len = 0;
+
+                if (hasBinaryState) {
+                    int stateIdx = IntegerSerializerDeserializer.getInt(data,
+                            offset);
+
+                    Object[] storedState = (Object[]) state.getState();
+
+                    len = getUTFLength(((String) (storedState[stateIdx])));
+                } else {
+                    len = getUTFLength((String) (state.getState()));
+                }
+
+                return len;
+            }
+
+            private int getUTFLength(String str) throws HyracksDataException {
+                int utflen = 0;
+                int c = 0;
+
+                /* use charAt instead of copying String to char array */
+                for (int i = 0; i < str.length(); i++) {
+                    c = str.charAt(i);
+                    if ((c >= 0x0001) && (c <= 0x007F)) {
+                        utflen++;
+                    } else if (c > 0x07FF) {
+                        utflen += 3;
+                    } else {
+                        utflen += 2;
+                    }
+                }
+
+                if (utflen > 65535) {
+                    throw new HyracksDataException("encoded string too long: "
+                            + utflen + " bytes");
+                }
+
+                return utflen + 2;
+            }
+
         };
     }
-
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 0a002da..27ad066 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -21,9 +21,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.FrameToolsForGroupers;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
@@ -37,7 +36,14 @@
 
     private static final long serialVersionUID = 1L;
     private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+    private int[] keys;
 
+    public MultiFieldsAggregatorFactory(int[] keys, 
+            IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+        this.keys = keys;
+        this.aggregatorFactories = aggregatorFactories;
+    }
+    
     public MultiFieldsAggregatorFactory(
             IFieldAggregateDescriptorFactory[] aggregatorFactories) {
         this.aggregatorFactories = aggregatorFactories;
@@ -55,21 +61,22 @@
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
             RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
-            throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, final int[] keyFields,
+            final int[] keyFieldsInPartialResults) throws HyracksDataException {
 
         final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
-        final IAggregateStateFactory[] aggregateStateFactories = new IAggregateStateFactory[aggregators.length];
         for (int i = 0; i < aggregators.length; i++) {
             aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
                     inRecordDescriptor, outRecordDescriptor);
-            aggregateStateFactories[i] = aggregators[i]
-                    .getAggregateStateFactory();
         }
-
-        int stateTupleFieldCount = keyFields.length;
-        for (int i = 0; i < aggregateStateFactories.length; i++) {
-            if (aggregateStateFactories[i].hasBinaryState()) {
+        
+        if(this.keys == null){
+            this.keys = keyFields;
+        }
+        
+        int stateTupleFieldCount = keys.length;
+        for (int i = 0; i < aggregators.length; i++) {
+            if (aggregators[i].needsBinaryState()) {
                 stateTupleFieldCount++;
             }
         }
@@ -82,155 +89,204 @@
 
         return new IAggregatorDescriptor() {
 
-            private boolean initPending, outputPending;
-
             @Override
             public void reset() {
                 for (int i = 0; i < aggregators.length; i++) {
                     aggregators[i].reset();
-                    aggregateStateFactories[i] = aggregators[i]
-                            .getAggregateStateFactory();
                 }
-                initPending = false;
-                outputPending = false;
             }
 
             @Override
-            public boolean outputPartialResult(FrameTupleAppender appender,
+            public void outputPartialResult(byte[] buf, int offset,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                if (!outputPending) {
-                    resultTupleBuilder.reset();
-                    for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                        resultTupleBuilder.addField(accessor, tIndex,
-                        		keyFieldsInPartialResults[i]);
-                    }
-                    DataOutput dos = resultTupleBuilder.getDataOutput();
 
-                    int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                    for (int i = 0; i < aggregators.length; i++) {
+                resultTupleBuilder.reset();
+                for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+                    resultTupleBuilder.addField(accessor, tIndex,
+                            keyFieldsInPartialResults[i]);
+                }
+                DataOutput dos = resultTupleBuilder.getDataOutput();
+
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                for (int i = 0; i < aggregators.length; i++) {
+                    int fieldOffset = accessor.getFieldStartOffset(tIndex,
+                            keys.length + i);
+                    aggregators[i].outputPartialResult(dos, accessor
+                            .getBuffer().array(),
+                            fieldOffset + accessor.getFieldSlotsLength()
+                                    + tupleOffset, ((AggregateState[]) state
+                                    .getState())[i]);
+                    resultTupleBuilder.addFieldEndOffset();
+                }
+
+                if (buf != null)
+                    FrameToolsForGroupers.writeFields(buf, offset, this
+                            .getPartialOutputLength(accessor, tIndex, state),
+                            resultTupleBuilder);
+
+            }
+
+            @Override
+            public void outputFinalResult(byte[] buf, int offset,
+                    IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+
+                resultTupleBuilder.reset();
+
+                for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+                    resultTupleBuilder.addField(accessor, tIndex,
+                            keyFieldsInPartialResults[i]);
+                }
+
+                DataOutput dos = resultTupleBuilder.getDataOutput();
+
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                for (int i = 0; i < aggregators.length; i++) {
+                    if (aggregators[i].needsBinaryState()) {
                         int fieldOffset = accessor.getFieldStartOffset(tIndex,
-                                keyFields.length + i);
-                        aggregators[i].outputPartialResult(dos, accessor
+                                keys.length + i);
+                        aggregators[i].outputFinalResult(dos, accessor
                                 .getBuffer().array(),
-                                fieldOffset + accessor.getFieldSlotsLength()
-                                        + tupleOffset,
+                                tupleOffset + accessor.getFieldSlotsLength()
+                                        + fieldOffset,
                                 ((AggregateState[]) state.getState())[i]);
-                        resultTupleBuilder.addFieldEndOffset();
+                    } else {
+                        aggregators[i].outputFinalResult(dos, null, 0,
+                                ((AggregateState[]) state.getState())[i]);
                     }
+                    resultTupleBuilder.addFieldEndOffset();
                 }
-                if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
-                        resultTupleBuilder.getByteArray(), 0,
-                        resultTupleBuilder.getSize())) {
-                    outputPending = true;
-                    return false;
-                }
-                outputPending = false;
-                return true;
 
+                if (buf != null)
+                    FrameToolsForGroupers.writeFields(buf, offset,
+                            this.getFinalOutputLength(accessor, tIndex, state),
+                            resultTupleBuilder);
             }
 
             @Override
-            public boolean outputFinalResult(FrameTupleAppender appender,
-                    IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                if (!outputPending) {
-                    resultTupleBuilder.reset();
-                    
-                    for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                        resultTupleBuilder.addField(accessor, tIndex,
-                        		keyFieldsInPartialResults[i]);
-                    }
- 
-                    DataOutput dos = resultTupleBuilder.getDataOutput();
+            public int getPartialOutputLength(IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                int stateLength = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
 
-                    int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                    for (int i = 0; i < aggregators.length; i++) {
-                        if (aggregateStateFactories[i].hasBinaryState()) {
-                            int fieldOffset = accessor.getFieldStartOffset(
-                                    tIndex, keyFields.length + i);
-                            aggregators[i].outputFinalResult(dos, accessor
-                                    .getBuffer().array(), tupleOffset
+                for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+                    stateLength += accessor.getFieldLength(tIndex,
+                            keyFieldsInPartialResults[i]);
+                    // add length for slot offset
+                    stateLength += 4;
+                }
+
+                for (int i = 0; i < aggregators.length; i++) {
+                    int fieldOffset = 0;
+                    if (aggregators[i].needsBinaryState()) {
+                        fieldOffset = accessor.getFieldStartOffset(tIndex,
+                                keys.length + i);
+                    }
+                    stateLength += aggregators[i].getPartialResultLength(
+                            accessor.getBuffer().array(), tupleOffset
                                     + accessor.getFieldSlotsLength()
+                                    + fieldOffset,
+                            ((AggregateState[]) state.getState())[i]);
+                    // add length for slot offset
+                    stateLength += 4;
+                }
+                return stateLength;
+            }
+
+            @Override
+            public int getFinalOutputLength(IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                int stateLength = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+
+                for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+                    stateLength += accessor.getFieldLength(tIndex,
+                            keyFieldsInPartialResults[i]);
+                    // add length for slot offset
+                    stateLength += 4;
+                }
+
+                for (int i = 0; i < aggregators.length; i++) {
+                    int fieldOffset = 0;
+                    if (aggregators[i].needsBinaryState()) {
+                        fieldOffset = accessor.getFieldStartOffset(tIndex,
+                                keys.length + i);
+                    }
+                    stateLength += aggregators[i].getFinalResultLength(accessor
+                            .getBuffer().array(),
+                            tupleOffset + accessor.getFieldSlotsLength()
                                     + fieldOffset, ((AggregateState[]) state
                                     .getState())[i]);
-                        } else {
-                            aggregators[i].outputFinalResult(dos, null, 0,
-                                    ((AggregateState[]) state.getState())[i]);
-                        }
-                        resultTupleBuilder.addFieldEndOffset();
-                    }
+                    // add length for slot offset
+                    stateLength += 4;
                 }
-                if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
-                        resultTupleBuilder.getByteArray(), 0,
-                        resultTupleBuilder.getSize())) {
-                    outputPending = true;
-                    return false;
-                }
-                outputPending = false;
-                return true;
+                return stateLength;
             }
 
             @Override
-            public boolean init(FrameTupleAppender appender,
+            public void init(byte[] buf, int offset,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                if (!initPending) {
-                    stateTupleBuilder.reset();
-                    for (int i = 0; i < keyFields.length; i++) {
-                        stateTupleBuilder.addField(accessor, tIndex,
-                                keyFields[i]);
-                    }
-                    DataOutput dos = stateTupleBuilder.getDataOutput();
 
-                    for (int i = 0; i < aggregators.length; i++) {
-                        aggregators[i].init(accessor, tIndex, dos,
-                                ((AggregateState[]) state.getState())[i]);
-                        if (aggregateStateFactories[i].hasBinaryState()) {
-                            stateTupleBuilder.addFieldEndOffset();
-                        }
+                stateTupleBuilder.reset();
+                for (int i = 0; i < keys.length; i++) {
+                    stateTupleBuilder.addField(accessor, tIndex, keys[i]);
+                }
+                DataOutput dos = stateTupleBuilder.getDataOutput();
+
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].init(accessor, tIndex, dos,
+                            ((AggregateState[]) state.getState())[i]);
+                    if (aggregators[i].needsBinaryState()) {
+                        stateTupleBuilder.addFieldEndOffset();
                     }
                 }
-                // For pre-cluster: no output state is needed
-                if(appender == null){
-                    initPending = false;
-                    return true;
-                }
-                if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
-                        stateTupleBuilder.getByteArray(), 0,
-                        stateTupleBuilder.getSize())) {
-                    initPending = true;
-                    return false;
-                }
-                initPending = false;
-                return true;
+                if (buf != null)
+                    FrameToolsForGroupers.writeFields(buf, offset, this
+                            .getBinaryAggregateStateLength(accessor, tIndex,
+                                    state), stateTupleBuilder);
             }
 
             @Override
             public AggregateState createAggregateStates() {
-                AggregateState aggregateStates = new AggregateState();
-                AggregateState[] states = new AggregateState[aggregateStateFactories.length];
+                AggregateState[] states = new AggregateState[aggregators.length];
                 for (int i = 0; i < states.length; i++) {
-                    states[i] = new AggregateState();
-                    states[i]
-                            .setState(aggregateStateFactories[i].createState());
+                    states[i] = aggregators[i].createState();
                 }
-                aggregateStates.setState(states);
-                return aggregateStates;
+                return new AggregateState(states);
             }
 
             @Override
-            public int getAggregateStatesLength() {
+            public int getBinaryAggregateStateLength(
+                    IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
                 int stateLength = 0;
-                for (int i = 0; i < aggregateStateFactories.length; i++) {
-                    stateLength += aggregateStateFactories[i].getStateLength();
+
+                for (int i = 0; i < keys.length; i++) {
+                    stateLength += accessor
+                            .getFieldLength(tIndex, keys[i]);
+                    // add length for slot offset
+                    stateLength += 4;
+                }
+
+                for (int i = 0; i < aggregators.length; i++) {
+                    if (aggregators[i].needsBinaryState()) {
+                        stateLength += aggregators[i].getBinaryStateLength(
+                                accessor, tIndex,
+                                ((AggregateState[]) state.getState())[i]);
+                        // add length for slot offset
+                        stateLength += 4;
+                    }
                 }
                 return stateLength;
             }
 
             @Override
             public void close() {
-                for(int i = 0; i < aggregators.length; i++){
+                for (int i = 0; i < aggregators.length; i++) {
                     aggregators[i].close();
                 }
             }
@@ -244,10 +300,10 @@
                             .getTupleStartOffset(stateTupleIndex);
                     int fieldIndex = 0;
                     for (int i = 0; i < aggregators.length; i++) {
-                        if (aggregateStateFactories[i].hasBinaryState()) {
+                        if (aggregators[i].needsBinaryState()) {
                             int stateFieldOffset = stateAccessor
                                     .getFieldStartOffset(stateTupleIndex,
-                                            keyFields.length + fieldIndex);
+                                            keys.length + fieldIndex);
                             aggregators[i].aggregate(
                                     accessor,
                                     tIndex,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index f171239..717cea5 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -67,7 +67,7 @@
 
 	final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
 			new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-					"data/tpch0.001/lineitem.tbl"))) });
+					 "data/tpch0.001/lineitem.tbl"))) });
 
 	final RecordDescriptor desc = new RecordDescriptor(
 			new ISerializerDeserializer[] {
@@ -765,6 +765,7 @@
 						IntegerSerializerDeserializer.INSTANCE });
 
 		int[] keyFields = new int[] { 8, 0 };
+		int[] keys = new int[] {0, 1};
 		int frameLimits = 4;
 		int tableSize = 8;
 
@@ -776,11 +777,11 @@
 						UTF8StringBinaryComparatorFactory.INSTANCE,
 						UTF8StringBinaryComparatorFactory.INSTANCE },
 				new UTF8StringNormalizedKeyComputerFactory(),
-				new MultiFieldsAggregatorFactory(
+				new MultiFieldsAggregatorFactory(keyFields,
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(1, false),
 								new IntSumFieldAggregatorFactory(3, false) }),
-				new MultiFieldsAggregatorFactory(
+				new MultiFieldsAggregatorFactory(keys,
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(2, false),
 								new IntSumFieldAggregatorFactory(3, false) }),
@@ -966,8 +967,8 @@
 								new IntSumFieldAggregatorFactory(1, false),
 								new CountFieldAggregatorFactory(false),
 								new AvgFieldGroupAggregatorFactory(1, false) }),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
+				new MultiFieldsAggregatorFactory(new int[]{0, 1},
+						new IFieldAggregateDescriptorFactory[] { 
 								new IntSumFieldAggregatorFactory(2, false),
 								new IntSumFieldAggregatorFactory(3, false),
 								new AvgFieldMergeAggregatorFactory(4, false) }),
@@ -1148,7 +1149,7 @@
 								new IntSumFieldAggregatorFactory(1, false),
 								new MinMaxStringFieldAggregatorFactory(15,
 										true, true) }),
-				new MultiFieldsAggregatorFactory(
+				new MultiFieldsAggregatorFactory(new int[] {0, 1},
 						new IFieldAggregateDescriptorFactory[] {
 								new IntSumFieldAggregatorFactory(2, false),
 								new MinMaxStringFieldAggregatorFactory(3, true,