Fixed the bug on new refactored aggregators of missing frames in the merging phase.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_spilling_groupby_perf@393 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
index 486fffa..7baa384 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
@@ -20,7 +20,6 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.io.DataOutput;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
@@ -29,12 +28,12 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 
 /**
  * @author jarodwen
  */
 public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
     private static final long serialVersionUID = 1L;
 
     private static final int INIT_ACCUMULATORS_SIZE = 8;
@@ -58,8 +57,6 @@
         if (this.outField < 0)
             this.outField = keyFields.length;
 
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-
         return new IAggregatorDescriptor() {
 
             byte[][] buf = new byte[INIT_ACCUMULATORS_SIZE][];
@@ -68,47 +65,55 @@
             int aggregatorCount = 0;
 
             @Override
-            public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+            public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
                     throws HyracksDataException {
-                int tupleOffset = accessor2.getTupleStartOffset(tIndex2);
-                int fieldCount = accessor2.getFieldCount();
-                int fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
-                int fieldLength = accessor2.getFieldLength(tIndex2, outField);
 
                 // FIXME Should be done in binary way
+                int bufIndex = IntegerSerializerDeserializer.getInt(data, offset);
                 StringBuilder sbder = new StringBuilder();
-                sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
-                        new ByteArrayInputStream(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
-                                + fieldStart, fieldLength))));
+                if (bufIndex < 0) {
+                    // Need to allocate a new field
+                    currentAggregatorIndex++;
+                    aggregatorCount++;
+                    bufIndex = currentAggregatorIndex;
+                    sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                            new ByteArrayInputStream(data, offset + 4, length - 4))));
+                } else {
+                    sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                            new ByteArrayInputStream(buf[bufIndex], 0, buf[bufIndex].length))));
+                }
+
+                
+
+                int utfLength = (data[offset + 4] << 2) + data[offset + 4 + 1];
 
                 // Get the new data
-                tupleOffset = accessor1.getTupleStartOffset(tIndex1);
-                fieldCount = accessor1.getFieldCount();
-                fieldStart = accessor1.getFieldStartOffset(tIndex1, outField);
-                fieldLength = accessor1.getFieldLength(tIndex1, outField);
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+                int fieldLength = accessor.getFieldLength(tIndex, outField);
                 sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
-                        new ByteArrayInputStream(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
-                                + fieldStart, fieldLength))));
+                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                                + fieldStart + 4, fieldLength - 4))));
 
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
-                ByteBuffer wrapBuf = accessor2.getBuffer();
-                wrapBuf.position(tupleOffset + 2 * fieldCount + fieldStart);
-                wrapBuf.put(baos.toByteArray());
+                buf[bufIndex] = baos.toByteArray();
+
+                // Update the ref index
+                ByteBuffer buf = ByteBuffer.wrap(data, offset, 4);
+                buf.putInt(bufIndex);
+                return 4 + 2 + utfLength;
             }
 
             @Override
-            public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, keyFields[i]);
-                }
                 // Initialize the aggregation value
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int tupleEndOffset = accessor.getTupleEndOffset(tIndex);
                 int fieldCount = accessor.getFieldCount();
                 int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+                int fieldLength = accessor.getFieldLength(tIndex, concatField);
                 int appendOffset = tupleOffset + 2 * fieldCount + fieldStart;
                 // Get the initial value
                 currentAggregatorIndex++;
@@ -119,149 +124,100 @@
                     }
                     this.buf = newBuf;
                 }
-                buf[currentAggregatorIndex] = new byte[tupleEndOffset - appendOffset + 1];
-                System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0,
-                        tupleEndOffset - appendOffset + 1);
+                buf[currentAggregatorIndex] = new byte[fieldLength];
+                System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0, fieldLength);
                 // Update the aggregator index
                 aggregatorCount++;
 
-                tb.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-
-                    return false;
+                try {
+                    tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
+                } catch (IOException e) {
+                    throw new HyracksDataException();
                 }
-                return true;
+            }
+
+            @Override
+            public void reset() {
+                currentAggregatorIndex = -1;
+                aggregatorCount = 0;
             }
 
             @Override
             public void close() {
                 currentAggregatorIndex = -1;
                 aggregatorCount = 0;
+                for (int i = 0; i < buf.length; i++) {
+                    buf[i] = null;
+                }
             }
 
             @Override
-            public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
                     throws HyracksDataException {
-                int tupleOffset = accessor2.getTupleStartOffset(tIndex2);
-                int fieldCount = accessor2.getFieldCount();
-                int fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
-                int refIndex = IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2
-                        * fieldCount + fieldStart);
+                int refIndex = IntegerSerializerDeserializer.getInt(data, offset);
                 // FIXME Should be done in binary way
                 StringBuilder sbder = new StringBuilder();
                 sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
                         new ByteArrayInputStream(buf[refIndex]))));
                 // Get the new data
-                tupleOffset = accessor1.getTupleStartOffset(tIndex1);
-                fieldCount = accessor1.getFieldCount();
-                fieldStart = accessor1.getFieldStartOffset(tIndex1, concatField);
-                int fieldLength = accessor1.getFieldLength(tIndex1, concatField);
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+                int fieldLength = accessor.getFieldLength(tIndex, concatField);
                 sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
-                        new ByteArrayInputStream(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
+                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
                                 + fieldStart, fieldLength))));
 
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
+                if(baos.size() > 10){
+                    System.err.println("Possible Error here");
+                }
                 buf[refIndex] = baos.toByteArray();
+                return 4;
             }
 
             @Override
-            public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
-                }
-                tb.addField(accessor, tIndex, outField);
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
-                    throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
-                }
-
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldCount = accessor.getFieldCount();
                 int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
                 int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2
                         * fieldCount + fieldStart);
 
-                tb.addField(buf[refIndex], 0, buf[refIndex].length);
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldCount = accessor.getFieldCount();
-                int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
-                int fieldLength = accessor.getFieldLength(tIndex, concatField);
-
-                // Get the initial value
-                currentAggregatorIndex++;
-                if (currentAggregatorIndex >= buf.length) {
-                    byte[][] newBuf = new byte[buf.length * 2][];
-                    for (int i = 0; i < buf.length; i++) {
-                        newBuf[i] = buf[i];
+                try {
+                    if (refIndex >= 0)
+                        tupleBuilder.getDataOutput().write(buf[refIndex]);
+                    else {
+                        int fieldLength = accessor.getFieldLength(tIndex, outField);
+                        tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount + fieldStart
+                                + 4, fieldLength - 4);
                     }
-                    this.buf = newBuf;
-                }
-                buf[currentAggregatorIndex] = new byte[fieldLength];
-                System.arraycopy(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount + fieldStart,
-                        buf[currentAggregatorIndex], 0, fieldLength);
-                // Update the aggregator index
-                aggregatorCount++;
-
-                try {
-                    dataOutput.writeInt(currentAggregatorIndex);
+                    tupleBuilder.addFieldEndOffset();
                 } catch (IOException e) {
                     throw new HyracksDataException();
                 }
             }
 
             @Override
-            public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldCount = accessor.getFieldCount();
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+                int fieldOffset = accessor.getFieldStartOffset(tIndex, outField);
                 int fieldLength = accessor.getFieldLength(tIndex, outField);
+                int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2
+                        * fieldCount + fieldOffset);
+
                 try {
-                    dataOutput.write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount + fieldStart,
-                            fieldLength);
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
-            }
-            
-            @Override
-            public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldCount = accessor.getFieldCount();
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-                int bufIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2
-                        * fieldCount + fieldStart);
-                try {
-                    dataOutput.write(buf[bufIndex]);
+                    tupleBuilder.getDataOutput().writeInt(-1);
+                    if (refIndex < 0) {
+                        tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + fieldCount * 2 + fieldOffset + 4, fieldLength - 4);
+                    } else {
+                        tupleBuilder.getDataOutput().write(buf[refIndex], 0, buf[refIndex].length);
+                    }
+                    tupleBuilder.addFieldEndOffset();
                 } catch (IOException e) {
                     throw new HyracksDataException();
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
index 190ae53..f3648ab 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.aggregators;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -23,7 +22,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 /**
@@ -49,7 +47,6 @@
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         if (this.outField < 0) {
             this.outField = keyFields.length;
@@ -57,21 +54,9 @@
         return new IAggregatorDescriptor() {
             
             @Override
-            public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, keyFields[i]);
-                }
-                // Insert the aggregation value
-                tb.addField(IntegerSerializerDeserializer.INSTANCE, 1);
-
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
+                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
             }
 
             @Override
@@ -79,89 +64,48 @@
             }
 
             @Override
-            public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
                     throws HyracksDataException {
-                int count = 0;
-
-                int tupleOffset = accessor2.getTupleStartOffset(tIndex2);
-                int fieldCount = accessor2.getFieldCount();
-                int fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
-                count += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart) + 1;
-                // Update the value of tuple 2
-                ByteBuffer buf = accessor2.getBuffer();
-                buf.position(tupleOffset + 2 * fieldCount + fieldStart);
-                buf.putInt(count);
+                ByteBuffer buf = ByteBuffer.wrap(data);
+                int count = buf.getInt(offset);
+                buf.putInt(offset, count + 1);
+                return 4;
             }
 
             @Override
-            public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
-                }
-                // Insert the aggregation value
-                tb.addField(accessor, tIndex, outField);
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
 
-            @Override
-            public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
-                    throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
-                }
-                // Insert the aggregation value
-                tb.addField(accessor, tIndex, outField);
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
-                    throws HyracksDataException {
-                int count = 0;
-                // Get value from tuple 1
-                int tupleOffset = accessor1.getTupleStartOffset(tIndex1);
-                int fieldCount = accessor1.getFieldCount();
-                int fieldStart = accessor1.getFieldStartOffset(tIndex1, outField);
-                count += IntegerSerializerDeserializer.getInt(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
-                // Get value from tuple 2
-                tupleOffset = accessor2.getTupleStartOffset(tIndex2);
-                fieldCount = accessor2.getFieldCount();
-                fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
-                count += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
-                // Update the value of tuple 2
-                ByteBuffer buf = accessor2.getBuffer();
-                buf.position(tupleOffset + 2 * fieldCount + fieldStart);
-                buf.putInt(count);
-            }
-
-            @Override
-            public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
                 try {
-                    dataOutput.writeInt(1);
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                            + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
                 } catch (IOException e) {
-                    throw new HyracksDataException();
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
                 }
             }
 
             @Override
-            public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+                try {
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                            + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
+                }
+            }
+
+            @Override
+            public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
                     throws HyracksDataException {
                 int count = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -169,29 +113,17 @@
                 int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
                 count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
                         + fieldStart);
-
-                try {
-                    dataOutput.writeInt(count);
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
+                // Update the value of tuple 2
+                ByteBuffer buf = ByteBuffer.wrap(data);
+                count += buf.getInt(offset);
+                buf.putInt(offset, count);
+                return 4;
             }
-            
-            @Override
-            public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
-                int count = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldCount = accessor.getFieldCount();
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-                count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
 
-                try {
-                    dataOutput.writeInt(count);
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+                
             }
         };
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
index b3eb90f..1dd837b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
@@ -14,143 +14,87 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.aggregators;
 
-import java.io.DataOutput;
-
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public interface IAggregatorDescriptor {
 
     /**
-     * Initialize the aggregator for the group indicated by the input tuple. Keys,
-     * and the aggregation values will be written out onto the frame wrapped in the appender.
+     * Initialize the aggregator with an input tuple specified by the input frame and tuple index. 
+     * This function will write the initialized partial result into the tuple builder.
      * 
      * @param accessor
-     *            The frame containing the input tuple.
      * @param tIndex
-     *            The tuple index in the frame.
-     * @param appender
-     *            The output frame.
+     * @param tupleBuilder
+     * @throws HyracksDataException
+     */
+    public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+            throws HyracksDataException;
+
+    /**
+     * Aggregate the input tuple with the partial result specified by the bytes. The new value then
+     * is written back to the bytes field specified.
+     * 
+     * It is the developer's responsibility to have the new result not exceed the given bytes.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param data
+     * @param offset
+     * @param length
      * @return
      * @throws HyracksDataException
      */
-    public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+    public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
             throws HyracksDataException;
 
     /**
-     * Get the initial aggregation value from the input tuple.
-     * Compared with {@link #init(IFrameTupleAccessor, int, FrameTupleAppender)}, instead of
-     * writing the value to a frame appender, the value will be written through the given {@link DataOutput}.
-     * 
-     * @param accessor
-     *            The frame containing the input tuple.
-     * @param tIndex
-     *            The tuple index in the frame.
-     * @param dataOutput
-     *            The data output wrapper.
-     * @throws HyracksDataException
-     */
-    public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-            throws HyracksDataException;
-
-    /**
-     * Aggregate the input tuple with the given partial aggregation value. The partial aggregation value will
-     * be updated after the aggregation.
-     * 
-     * @param accessor1
-     *            The frame containing the input tuple to be aggregated.
-     * @param tIndex1
-     *            The tuple index in the frame.
-     * @param accessor2
-     *            The frame containing the partial aggregation value.
-     * @param tIndex2
-     *            The tuple index of the partial aggregation value. Note that finally this value will be
-     *            updated to be the new aggregation value.
-     * @throws HyracksDataException
-     */
-    public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
-            throws HyracksDataException;
-
-    /**
-     * Merge the partial aggregation value with another partial aggregation value. After merging, the aggregation
-     * value of the second partial aggregator will be updated.
-     * 
-     * @param accessor1
-     *            The frame containing the tuple of partial aggregation value to be aggregated.
-     * @param tIndex1
-     *            The index of the tuple of partial aggregation value to be aggregated.
-     * @param accessor2
-     *            The frame containing the tuple of partial aggregation value to be updated.
-     * @param tIndex2
-     *            The index of the tuple of partial aggregation value to be updated. Note that after merging,
-     *            this value will be updated to be the new aggregation value.
-     */
-    public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
-            throws HyracksDataException;
-
-    /**
-     * Output the partial aggregation result into a frame appender. The outputted result can be used to
-     * aggregate a new tuple from the input frame.
-     * This method, together with the {@link #outputPartialResult(IFrameTupleAccessor, int, FrameTupleAppender)},
-     * is for the case when different processing logics are applied to partial aggregation result and the merged
-     * aggregation result.
-     * For example, in an aggregator for variable-length aggregation results, aggregation values should be maintained
-     * inside of the aggregators, instead of the frames. A reference will be used to indicate the aggregation value
-     * in the partial aggregation result, while the actual aggregation value will be used in the merged aggregation
-     * result.
+     * Merge two partial aggregation results. The merged value then is written back to the bytes
+     * fields specified.
      * 
      * @param accessor
      * @param tIndex
-     * @param appender
+     * @param data
+     * @param offset
+     * @param length
      * @return
      * @throws HyracksDataException
      */
-    public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+    public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
             throws HyracksDataException;
 
     /**
-     * Output the merged aggregation result into a frame appender. The outputted result can be used to
-     * merge another partial aggregation result.
-     * See {@link #outputPartialResult(IFrameTupleAccessor, int, FrameTupleAppender)} for the difference
-     * between these two methods.
+     * Output the partial aggregation result to an array tuple builder. Necessary additional information
+     * for aggregation should be maintained. 
+     * 
+     * For example, for an aggregator calculating AVG, the count and also the current average should be
+     * maintained as the partial results.
+     * 
      * 
      * @param accessor
      * @param tIndex
-     * @param appender
+     * @param tupleBuilder
+     * @throws HyracksDataException
+     */
+    public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+            throws HyracksDataException;
+
+    /**
+     * Output the final aggregation result to an array tuple builder.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param tupleBuilder
      * @return
      * @throws HyracksDataException
      */
-    public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+    public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
             throws HyracksDataException;
 
-    /**
-     * Get the partial aggregation value from the merged aggregator indicated by the frame and the tuple index.
-     * Compared with {@link #outputPartialResult(IFrameTupleAccessor, int, FrameTupleAppender)}, this will output the value
-     * through the given {@link DataOutput}.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param dataOutput
-     * @throws HyracksDataException
-     */
-    public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-            throws HyracksDataException;
+    public void reset();
     
     /**
-     * Get the merged aggregation value from the merged aggregator indicated by the frame and the tuple index.
-     * Compared with {@link #outputMergeResult(IFrameTupleAccessor, int, FrameTupleAppender)}, this will output the value
-     * through the given {@link DataOutput}.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param dataOutput
-     * @throws HyracksDataException
-     */
-    public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-            throws HyracksDataException;
-
-    /**
      * Close the aggregator. Necessary clean-up code should be implemented here.
      */
     public void close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
index 28a5c79..23a90d6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
@@ -16,7 +16,6 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.io.DataOutput;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
@@ -24,7 +23,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 
 /**
  * @author jarodwen
@@ -49,8 +47,6 @@
     public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
 
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-
         if (this.outField < 0) {
             this.outField = keyFields.length;
         }
@@ -58,105 +54,7 @@
         return new IAggregatorDescriptor() {
 
             @Override
-            public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
-                    throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, keyFields[i]);
-                }
-                // Insert the aggregation value
-                tb.addField(accessor, tIndex, aggField);
-
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public void close() {
-            }
-
-            @Override
-            public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
-                    throws HyracksDataException {
-                int sum = 0;
-                int tupleOffset = accessor1.getTupleStartOffset(tIndex1);
-                int fieldCount = accessor1.getFieldCount();
-                int fieldStart = accessor1.getFieldStartOffset(tIndex1, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
-                tupleOffset = accessor2.getTupleStartOffset(tIndex2);
-                fieldCount = accessor2.getFieldCount();
-                fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
-                sum += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
-                // Update the value of tuple 2
-                ByteBuffer buf = accessor2.getBuffer();
-                buf.position(tupleOffset + 2 * fieldCount + fieldStart);
-                buf.putInt(sum);
-            }
-
-            @Override
-            public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
-                    throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
-                }
-                // Insert the aggregation value
-                tb.addField(accessor, tIndex, outField);
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
-                    throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
-                }
-                // Insert the aggregation value
-                tb.addField(accessor, tIndex, outField);
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
-                    throws HyracksDataException {
-                int sum = 0;
-                // Get value from tuple 1
-                int tupleOffset = accessor1.getTupleStartOffset(tIndex1);
-                int fieldCount = accessor1.getFieldCount();
-                int fieldStart = accessor1.getFieldStartOffset(tIndex1, outField);
-                sum += IntegerSerializerDeserializer.getInt(accessor1.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
-                // Get value from tuple 2
-                tupleOffset = accessor2.getTupleStartOffset(tIndex2);
-                fieldCount = accessor2.getFieldCount();
-                fieldStart = accessor2.getFieldStartOffset(tIndex2, outField);
-                sum += IntegerSerializerDeserializer.getInt(accessor2.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
-                // Update the value of tuple 2
-                ByteBuffer buf = accessor2.getBuffer();
-                buf.position(tupleOffset + 2 * fieldCount + fieldStart);
-                buf.putInt(sum);
-            }
-
-            @Override
-            public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -165,15 +63,63 @@
                 sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
                         + fieldStart);
 
+                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, sum);
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+                    throws HyracksDataException {
+                int sum = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                        + fieldStart);
+                // Update the value of tuple 2
+                ByteBuffer buf = ByteBuffer.wrap(data);
+                sum += buf.getInt(offset);
+                buf.putInt(offset, sum);
+                return 4;
+            }
+
+            @Override
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
                 try {
-                    dataOutput.writeInt(sum);
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                            + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
                 } catch (IOException e) {
-                    throw new HyracksDataException();
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
                 }
             }
 
             @Override
-            public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldCount = accessor.getFieldCount();
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+                try {
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
+                            + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
+                }
+            }
+
+            @Override
+            public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
                     throws HyracksDataException {
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -181,29 +127,17 @@
                 int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
                 sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
                         + fieldStart);
-
-                try {
-                    dataOutput.writeInt(sum);
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
+                // Update the value of tuple 2
+                ByteBuffer buf = ByteBuffer.wrap(data);
+                sum += buf.getInt(offset);
+                buf.putInt(offset, sum);
+                return 4;
             }
-            
-            @Override
-            public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
-                int sum = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldCount = accessor.getFieldCount();
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset + 2 * fieldCount
-                        + fieldStart);
 
-                try {
-                    dataOutput.writeInt(sum);
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+                
             }
         };
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
index ef34569..e02087b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
@@ -14,14 +14,12 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.aggregators;
 
-import java.io.DataOutput;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 
 /**
  * @author jarodwen
@@ -30,7 +28,7 @@
 public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
-    
+
     private final IAggregatorDescriptorFactory[] aggregatorFactories;
 
     public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
@@ -45,7 +43,6 @@
             final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
             throws HyracksDataException {
 
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
         final IAggregatorDescriptor[] aggregators = new IAggregatorDescriptor[this.aggregatorFactories.length];
         for (int i = 0; i < aggregators.length; i++) {
             aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
@@ -55,78 +52,47 @@
         return new IAggregatorDescriptor() {
 
             @Override
-            public boolean init(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, keyFields[i]);
-                }
-                DataOutput dos = tb.getDataOutput();
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].getInitValue(accessor, tIndex, dos);
-                    tb.addFieldEndOffset();
-                }
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public void aggregate(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].aggregate(accessor1, tIndex1, accessor2, tIndex2);
+                for(int i = 0; i < aggregators.length; i++){
+                    aggregators[i].init(accessor, tIndex, tupleBuilder);
                 }
             }
 
             @Override
-            public void merge(IFrameTupleAccessor accessor1, int tIndex1, IFrameTupleAccessor accessor2, int tIndex2)
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
                     throws HyracksDataException {
+                int adjust = 0;
                 for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].merge(accessor1, tIndex1, accessor2, tIndex2);
+                    adjust += aggregators[i].aggregate(accessor, tIndex, data, offset + adjust, length - adjust);
+                }
+                return adjust;
+            }
+
+            @Override
+            public int merge(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+                    throws HyracksDataException {
+                int adjust = 0;
+                for (int i = 0; i < aggregators.length; i++) {
+                    adjust += aggregators[i].merge(accessor, tIndex, data, offset + adjust, length - adjust);
+                }
+                return adjust;
+            }
+
+            @Override
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                for(int i = 0; i < aggregators.length; i++){
+                    aggregators[i].outputPartialResult(accessor, tIndex, tupleBuilder);
                 }
             }
 
             @Override
-            public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
                     throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
+                for(int i = 0; i < aggregators.length; i++){
+                    aggregators[i].outputResult(accessor, tIndex, tupleBuilder);
                 }
-                DataOutput dos = tb.getDataOutput();
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].getPartialOutputValue(accessor, tIndex, dos);
-                    tb.addFieldEndOffset();
-                }
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public boolean outputMergeResult(IFrameTupleAccessor accessor, int tIndex, FrameTupleAppender appender)
-                    throws HyracksDataException {
-                // Construct the tuple using keys and sum value
-                tb.reset();
-                for (int i = 0; i < keyFields.length; i++) {
-                    tb.addField(accessor, tIndex, i);
-                }
-                DataOutput dos = tb.getDataOutput();
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].getMergeOutputValue(accessor, tIndex, dos);
-                    tb.addFieldEndOffset();
-                }
-                // Write the tuple out
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    return false;
-                }
-                return true;
             }
 
             @Override
@@ -137,30 +103,12 @@
             }
 
             @Override
-            public void getInitValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].getInitValue(accessor, tIndex, dataOutput);
-                }
-            }
-
-            @Override
-            public void getPartialOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].getPartialOutputValue(accessor, tIndex, dataOutput);
-                }
-            }
-            
-            @Override
-            public void getMergeOutputValue(IFrameTupleAccessor accessor, int tIndex, DataOutput dataOutput)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].getMergeOutputValue(accessor, tIndex, dataOutput);
+            public void reset(){
+                for(int i = 0; i < aggregators.length; i++){
+                    aggregators[i].reset();
                 }
             }
 
         };
     }
-
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java
index ea486f1..fdf4b5e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/BSTSpillableGroupingTableFactory.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -35,7 +36,6 @@
 
 /**
  * @author jarodwen
- *
  */
 public class BSTSpillableGroupingTableFactory implements ISpillableTableFactory {
 
@@ -61,8 +61,7 @@
             storedKeys[i] = i;
             storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
         }
-        final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
-                outRecordDescriptor);
+        final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
 
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
@@ -75,6 +74,8 @@
 
         final ByteBuffer outFrame = ctx.allocateFrame();
 
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+
         return new ISpillableTable() {
 
             /**
@@ -130,13 +131,21 @@
                 }
 
                 if (!foundGroup) {
-                    // Did not find the aggregator. Insert a new aggregator entry
-                    if(!aggregator.init(accessor, tIndex, appender)){
-                        if(!nextAvailableFrame()){
+                    // If no matching group is found, create a new aggregator
+                    // Create a tuple for the new group
+                    tupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        tupleBuilder.addField(accessor, tIndex, keyFields[i]);
+                    }
+                    aggregator.init(accessor, tIndex, tupleBuilder);
+                    if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        if (!nextAvailableFrame()) {
                             return false;
                         } else {
-                            if(!aggregator.init(accessor, tIndex, appender)){
-                                throw new IllegalStateException("Failed to init an aggergator.");
+                            if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                    tupleBuilder.getSize())) {
+                                throw new IllegalStateException("Failed to init an aggregator");
                             }
                         }
                     }
@@ -145,7 +154,13 @@
                     // Add entry into the binary search tree
                     bstreeAdd(sbIndex, stIndex, parentPtr, isSmaller);
                 } else {
-                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, stIndex);
+                 // If there is a matching found, do aggregation directly
+                    int tupleOffset = storedKeysAccessor1.getTupleStartOffset(stIndex);
+                    int fieldCount = storedKeysAccessor1.getFieldCount();
+                    int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(stIndex, keyFields.length);
+                    int tupleEnd = storedKeysAccessor1.getTupleEndOffset(stIndex);
+                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset + 2
+                            * fieldCount + aggFieldOffset, tupleEnd - (tupleOffset + 2 * fieldCount + aggFieldOffset));
                 }
                 return true;
             }
@@ -161,11 +176,11 @@
             }
 
             @Override
-            public void flushFrames(IFrameWriter writer) throws HyracksDataException {
+            public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 writer.open();
                 appender.reset(outFrame, true);
-                traversalBSTree(writer, 0, appender);
+                traversalBSTree(writer, 0, appender, isPartial);
                 if (appender.getTupleCount() > 0) {
                     FrameUtils.flushFrame(outFrame, writer);
                 }
@@ -211,20 +226,33 @@
                 return true;
             }
 
-            private void traversalBSTree(IFrameWriter writer, int nodePtr, FrameTupleAppender appender)
+            private void traversalBSTree(IFrameWriter writer, int nodePtr, FrameTupleAppender appender, boolean isPartial)
                     throws HyracksDataException {
                 if (nodePtr < 0 || nodePtr >= bstreeSize)
                     return;
-                traversalBSTree(writer, bstree[nodePtr + 2], appender);
+                traversalBSTree(writer, bstree[nodePtr + 2], appender, isPartial);
 
                 int bIndex = bstree[nodePtr];
                 int tIndex = bstree[nodePtr + 1];
                 storedKeysAccessor1.reset(frames.get(bIndex));
-                while (!aggregator.outputPartialResult(storedKeysAccessor1, tIndex, appender)) {
+                // Reset the tuple for the partial result
+                tupleBuilder.reset();
+                for (int k = 0; k < keyFields.length; k++) {
+                    tupleBuilder.addField(storedKeysAccessor1, tIndex, k);
+                }
+                if (isPartial)
+                    aggregator.outputPartialResult(storedKeysAccessor1, tIndex,
+                            tupleBuilder);
+                else
+                    aggregator.outputResult(storedKeysAccessor1, tIndex,
+                            tupleBuilder);
+                while (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+                        0, tupleBuilder.getSize())) {
                     FrameUtils.flushFrame(outFrame, writer);
                     appender.reset(outFrame, true);
                 }
-                traversalBSTree(writer, bstree[nodePtr + 3], appender);
+                
+                traversalBSTree(writer, bstree[nodePtr + 3], appender, isPartial);
             }
 
             private void bstreeAdd(int bufferIdx, int tIndex, int parentPtr, boolean isSmaller) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 0527e9e..8b0d8a9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -51,7 +52,6 @@
 
 /**
  * @author jarodwen
- *
  */
 public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
 
@@ -81,9 +81,11 @@
 
     private final ISpillableTableFactory spillableTableFactory;
 
+    private final boolean isOutputSorted;
+
     public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory) {
+            RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         if (framesLimit <= 1) {
@@ -96,6 +98,7 @@
         this.keyFields = keyFields;
         this.comparatorFactories = comparatorFactories;
         this.spillableTableFactory = spillableTableFactory;
+        this.isOutputSorted = isOutputSorted;
 
         // Set the record descriptor. Note that since 
         // this operator is a unary operator,
@@ -203,7 +206,7 @@
                     writer.open();
                     try {
                         gTable.sortFrames();
-                        gTable.flushFrames(writer);
+                        gTable.flushFrames(writer, true);
                     } catch (Exception ex) {
                         throw new HyracksDataException(ex);
                     } finally {
@@ -242,6 +245,9 @@
             for (int i = 0; i < keyFields.length; ++i) {
                 storedKeys[i] = i;
             }
+            // Tuple builder
+            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 /**
                  * Input frames, one for each run file.
@@ -251,7 +257,7 @@
                 /**
                  * Output frame.
                  */
-                private ByteBuffer outFrame;
+                private ByteBuffer outFrame, writerFrame;
 
                 /**
                  * List of the run files to be merged
@@ -266,6 +272,9 @@
                 private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
                         recordDescriptors[0]);
 
+                private ArrayTupleBuilder finalTupleBuilder;
+                private FrameTupleAppender writerFrameAppender;
+
                 @SuppressWarnings("unchecked")
                 public void initialize() throws HyracksDataException {
                     runs = (LinkedList<RunFileReader>) env.get(RUNS);
@@ -274,7 +283,9 @@
                         if (runs.size() <= 0) {
                             ISpillableTable gTable = (ISpillableTable) env.get(GROUPTABLES);
                             if (gTable != null) {
-                                gTable.flushFrames(writer);
+                                if (isOutputSorted)
+                                    gTable.sortFrames();
+                                gTable.flushFrames(writer, false);
                             }
                             env.set(GROUPTABLES, null);
                         } else {
@@ -304,15 +315,14 @@
                     IFrameWriter writer = this.writer;
                     boolean finalPass = false;
 
-                    if (runs.size() + 1 <= framesLimit) {
+                    if (runs.size() + 2 <= framesLimit) {
                         // All in-frames can be fit into memory, so no run file
                         // will be produced and this will be the final pass.
                         finalPass = true;
-                        // Remove the unnecessary frames. Note that we only need
-                        // frames to contain all inFrames but no extra out frame,
-                        // since the output is directly outputed to the writer.
-                        for (int i = inFrames.size() - 1; i >= runs.size(); i--)
-                            inFrames.remove(i);
+                        // Remove the unnecessary frames. 
+                        while (inFrames.size() > runs.size()) {
+                            inFrames.remove(inFrames.size() - 1);
+                        }
                     } else {
                         // Files need to be merged.
                         newRun = ctx.getJobletContext().createWorkspaceFile(
@@ -357,51 +367,57 @@
                         }
 
                         // Start merging
-                        int keyIndexInOutFrame = -1;
-
                         while (!topTuples.areRunsExhausted()) {
                             // Get the top record
                             ReferenceEntry top = topTuples.peek();
                             int tupleIndex = top.getTupleIndex();
                             int runIndex = topTuples.peek().getRunid();
                             FrameTupleAccessor fta = top.getAccessor();
-                            if (keyIndexInOutFrame < 0) {
+                            int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+                            if (currentTupleInOutFrame < 0
+                                    || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
                                 // Initialize the first output record
-                                if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex, outFrameAppender)) {
-                                    FrameUtils.flushFrame(outFrame, writer);
-                                    outFrameAppender.reset(outFrame, true);
-                                    if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex, outFrameAppender))
-                                        throw new IllegalStateException("Failed to flush a merge frame!");
+                                // Reset the tuple builder
+                                tupleBuilder.reset();
+
+                                for (int i = 0; i < keyFields.length; i++) {
+                                    tupleBuilder.addField(fta, tupleIndex, i);
                                 }
-                                keyIndexInOutFrame = outFrameAccessor.getTupleCount() - 1;
+
+                                currentWorkingAggregator.outputPartialResult(fta, tupleIndex, tupleBuilder);
+
+                                if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+                                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                                    // Make sure that when the outFrame is being flushed, all results in it are in
+                                    // the correct state
+                                    flushOutFrame(writer, finalPass);
+                                    if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+                                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
+                                        throw new HyracksDataException(
+                                                "Failed to append an aggregation result to the output frame.");
+                                }
                             } else {
-                                if (compareFrameTuples(fta, tupleIndex, outFrameAccessor, keyIndexInOutFrame) == 0) {
-                                    // if new tuple is in the same group of the current aggregator
-                                    // do merge and output to the outFrame
-                                    currentWorkingAggregator.merge(fta, tupleIndex, outFrameAccessor,
-                                            keyIndexInOutFrame);
-                                } else {
-                                    // otherwise, create a new output record in the outFrame
-                                    if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex, outFrameAppender)) {
-                                        FrameUtils.flushFrame(outFrame, writer);
-                                        outFrameAppender.reset(outFrame, true);
-                                        if (!currentWorkingAggregator.outputMergeResult(fta, tupleIndex,
-                                                outFrameAppender))
-                                            throw new IllegalStateException("Failed to flush a merge frame!");
-                                    }
-                                    keyIndexInOutFrame = outFrameAccessor.getTupleCount() - 1;
-                                }
+                                // if new tuple is in the same group of the current aggregator
+                                // do merge and output to the outFrame
+                                int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
+                                int fieldCount = outFrameAccessor.getFieldCount();
+                                int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
+                                        keyFields.length);
+                                int fieldLength = outFrameAccessor.getFieldLength(currentTupleInOutFrame,
+                                        keyFields.length);
+                                currentWorkingAggregator.merge(fta, tupleIndex, outFrameAccessor.getBuffer().array(),
+                                        tupleOffset + 2 * fieldCount + fieldOffset, fieldLength);
+
                             }
                             tupleIndices[runIndex]++;
                             setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
                         }
-                        // After processing all records, flush the aggregator
-                        currentWorkingAggregator.close();
                         // Flush the outFrame
                         if (outFrameAppender.getTupleCount() > 0) {
-                            FrameUtils.flushFrame(outFrame, writer);
-                            outFrameAppender.reset(outFrame, true);
+                            flushOutFrame(writer, finalPass);
                         }
+                        // After processing all records, flush the aggregator
+                        currentWorkingAggregator.close();
                         // Remove the processed run files
                         runs.subList(0, inFrames.size()).clear();
                         // insert the new run file into the beginning of the run
@@ -416,6 +432,46 @@
                     }
                 }
 
+                private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+                    if (isFinal) {
+                        if (finalTupleBuilder == null) {
+                            finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+                        }
+                        if (writerFrame == null) {
+                            writerFrame = ctx.allocateFrame();
+                        }
+                        if (writerFrameAppender == null) {
+                            writerFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+                            writerFrameAppender.reset(writerFrame, true);
+                        }
+                        outFrameAccessor.reset(outFrame);
+                        for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+                            finalTupleBuilder.reset();
+                            for (int j = 0; j < keyFields.length; j++) {
+                                finalTupleBuilder.addField(outFrameAccessor, i, j);
+                            }
+                            currentWorkingAggregator.outputResult(outFrameAccessor, i, finalTupleBuilder);
+
+                            if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+                                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                                FrameUtils.flushFrame(writerFrame, writer);
+                                writerFrameAppender.reset(writerFrame, true);
+                                if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+                                        finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize()))
+                                    throw new HyracksDataException(
+                                            "Failed to write final aggregation result to a writer frame!");
+                            }
+                        }
+                        if(writerFrameAppender.getTupleCount() > 0){
+                            FrameUtils.flushFrame(writerFrame, writer);
+                            writerFrameAppender.reset(writerFrame, true);
+                        }
+                    } else {
+                        FrameUtils.flushFrame(outFrame, writer);
+                    }
+                    outFrameAppender.reset(outFrame, true);
+                }
+
                 private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
                         FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
                         throws HyracksDataException {
@@ -530,4 +586,5 @@
         }
 
     }
+
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
index 4e6470f..4108adf 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -37,7 +38,6 @@
 
 /**
  * @author jarodwen
- *
  */
 public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
 
@@ -50,14 +50,11 @@
 
     private final int tableSize;
 
-    private final boolean isSorted;
-
-    public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize, boolean isSorted) {
+    public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
         this.tpcf = tpcf;
         this.tableSize = tableSize;
-        this.isSorted = isSorted;
     }
-    
+
     /* (non-Javadoc)
      * @see edu.uci.ics.hyracks.dataflow.std.group.ISpillableTableFactory#buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[], edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
      */
@@ -73,7 +70,7 @@
             storedKeys[i] = i;
             storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
         }
-        
+
         final FrameTupleAccessor storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
         final FrameTupleAccessor storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
 
@@ -92,6 +89,8 @@
 
         final ByteBuffer outFrame = ctx.allocateFrame();
 
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+
         return new ISpillableTable() {
 
             private int dataFrameCount;
@@ -154,11 +153,19 @@
                 // Do insert
                 if (!foundGroup) {
                     // If no matching group is found, create a new aggregator
-                    if(!aggregator.init(accessor, tIndex, appender)){
-                        if(!nextAvailableFrame()){
+                    // Create a tuple for the new group
+                    tupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        tupleBuilder.addField(accessor, tIndex, keyFields[i]);
+                    }
+                    aggregator.init(accessor, tIndex, tupleBuilder);
+                    if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        if (!nextAvailableFrame()) {
                             return false;
                         } else {
-                            if(!aggregator.init(accessor, tIndex, appender)){
+                            if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                    tupleBuilder.getSize())) {
                                 throw new IllegalStateException("Failed to init an aggregator");
                             }
                         }
@@ -167,12 +174,17 @@
                     sbIndex = dataFrameCount;
                     stIndex = appender.getTupleCount() - 1;
                     link.add(sbIndex, stIndex);
-                    
+
                 } else {
                     // If there is a matching found, do aggregation directly
-                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, stIndex);
+                    int tupleOffset = storedKeysAccessor1.getTupleStartOffset(stIndex);
+                    int fieldCount = storedKeysAccessor1.getFieldCount();
+                    int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(stIndex, keyFields.length);
+                    int tupleEnd = storedKeysAccessor1.getTupleEndOffset(stIndex);
+                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset + 2
+                            * fieldCount + aggFieldOffset, tupleEnd - (tupleOffset + 2 * fieldCount + aggFieldOffset));
                 }
-                
+
                 return true;
             }
 
@@ -187,13 +199,12 @@
             }
 
             @Override
-            public void flushFrames(IFrameWriter writer) throws HyracksDataException {
+            public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
 
                 writer.open();
                 appender.reset(outFrame, true);
-                if(isSorted)
-                    sortFrames();
+
                 if (tPointers == null) {
                     // Not sorted
                     for (int i = 0; i < table.length; ++i) {
@@ -203,7 +214,19 @@
                                 int bIndex = link.pointers[j];
                                 int tIndex = link.pointers[j + 1];
                                 storedKeysAccessor1.reset(frames.get(bIndex));
-                                while (!aggregator.outputPartialResult(storedKeysAccessor1, tIndex, appender)) {
+                                // Reset the tuple for the partial result
+                                tupleBuilder.reset();
+                                for (int k = 0; k < keyFields.length; k++) {
+                                    tupleBuilder.addField(storedKeysAccessor1, tIndex, k);
+                                }
+                                if (isPartial)
+                                    aggregator.outputPartialResult(storedKeysAccessor1, tIndex,
+                                            tupleBuilder);
+                                else
+                                    aggregator.outputResult(storedKeysAccessor1, tIndex,
+                                            tupleBuilder);
+                                while (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+                                        0, tupleBuilder.getSize())) {
                                     FrameUtils.flushFrame(outFrame, writer);
                                     appender.reset(outFrame, true);
                                 }
@@ -227,10 +250,24 @@
                     storedKeysAccessor1.reset(buffer);
 
                     // Insert
-                    if (!aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, appender)) {
+                    // Reset the tuple for the partial result
+                    tupleBuilder.reset();
+                    for (int k = 0; k < keyFields.length; k++) {
+                        tupleBuilder.addField(storedKeysAccessor1, tupleIndex, k);
+                    }
+                    if (isPartial)
+                        aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex,
+                                tupleBuilder);
+                    else
+                        aggregator.outputResult(storedKeysAccessor1, tupleIndex,
+                                tupleBuilder);
+
+                    if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+                            0, tupleBuilder.getSize())) {
                         FrameUtils.flushFrame(outFrame, writer);
                         appender.reset(outFrame, true);
-                        if (!aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, appender)) {
+                        if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+                                0, tupleBuilder.getSize())) {
                             throw new IllegalStateException();
                         }
                     }
@@ -425,4 +462,5 @@
             return sb.toString();
         }
     }
+
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
index 462e570..05f985f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -37,5 +37,5 @@
     
     public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
     
-    public void flushFrames(IFrameWriter writer) throws HyracksDataException;
+    public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
 }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java
index 1831a4d..4d4dce2 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/benchmarking/BenchmarkingDataGenTests.java
@@ -157,8 +157,7 @@
                         new CountAggregatorDescriptorFactory(1), new IntSumAggregatorDescriptorFactory(1, 2),
                         new ConcatAggregatorDescriptorFactory(4, 3) }), outRecordDescriptor,
                 new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize,
-                        true));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java
index 4c9eede..eea510c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalGroupRefactorTest.java
@@ -54,16 +54,23 @@
 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 
 /**
  * @author jarodwen
- *
  */
 public class ExternalGroupRefactorTest extends AbstractIntegrationTest {
+
     final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
             new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
 
+    static final String outSplitsPrefix = System.getProperty("java.io.tmpdir");
+    
+    static final String outSplits1 = "nc1:" + outSplitsPrefix + "aggregation_";
+    static final String outSplits2 = "nc2:" + outSplitsPrefix + "aggregation_";
+
+    static final boolean isOutputFile = true;
+
     final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
             UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
             IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
@@ -82,6 +89,33 @@
             UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
             UTF8StringParserFactory.INSTANCE, }, '|');
 
+    private static FileSplit[] parseFileSplits(String fileSplits) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+        }
+        return fSplits;
+    }
+
+    private static AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, boolean isFile,
+            String prefix) {
+        AbstractSingleActivityOperatorDescriptor printer;
+
+        if (!isOutputFile)
+            printer = new PrinterOperatorDescriptor(spec);
+        else
+            printer = new PlainFileWriterOperatorDescriptor(spec, new ConstantFileSplitProvider(
+                    parseFileSplits(outSplits1 + prefix + ".nc1, " + outSplits2 + prefix + ".nc2")), "\t");
+
+        return printer;
+    }
+
     @Test
     public void hashSingleKeyScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
@@ -92,9 +126,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, 
-                IntegerSerializerDeserializer.INSTANCE
-                });
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 3;
@@ -105,7 +137,7 @@
                 new IntSumAggregatorDescriptorFactory(1), outputRec, new HashSpillableGroupingTableFactory(
                         new FieldHashPartitionComputerFactory(keyFields,
                                 new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize, true));
+                        tableSize), true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -113,8 +145,10 @@
                 new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
-        
-        AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashSingleKeyScalarGroupTest");
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -141,7 +175,7 @@
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new IntSumAggregatorDescriptorFactory(1), outputRec, new BSTSpillableGroupingTableFactory());
+                new IntSumAggregatorDescriptorFactory(1), outputRec, new BSTSpillableGroupingTableFactory(), true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -150,7 +184,7 @@
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile, "BSTSingleKeyScalarGroupTest");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -182,7 +216,7 @@
                         UTF8StringBinaryComparatorFactory.INSTANCE }, new IntSumAggregatorDescriptorFactory(1),
                 outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -191,7 +225,8 @@
                         UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyScalarGroupTest");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -225,7 +260,7 @@
                                 new IntSumAggregatorDescriptorFactory(2, 3) }), outputRec,
                 new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -234,7 +269,8 @@
                         UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyMultipleScalarGroupTest");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -261,18 +297,21 @@
         int tableSize = 8;
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, new ConcatAggregatorDescriptorFactory(9),
-                outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new ConcatAggregatorDescriptorFactory(9), outputRec, new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyNonScalarGroupTest");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -281,7 +320,7 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
     @Test
     public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
@@ -293,7 +332,8 @@
 
         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE});
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0, 9 };
         int frameLimits = 3;
@@ -303,10 +343,11 @@
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
                         UTF8StringBinaryComparatorFactory.INSTANCE }, new MultiAggregatorDescriptorFactory(
                         new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(1, 2),
-                                new IntSumAggregatorDescriptorFactory(2, 3), new ConcatAggregatorDescriptorFactory(9, 4) }), outputRec,
+                                new IntSumAggregatorDescriptorFactory(2, 3),
+                                new ConcatAggregatorDescriptorFactory(9, 4) }), outputRec,
                 new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize, true));
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -315,7 +356,8 @@
                         UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyMultipleFieldsGroupTest");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);