Added hybrid-hash-group operator:
- added hybrid-hash-grouper operator;
- added external hash grouper in separated classes (so it can be invoked by hybrid-hash-grouper);
- updated aggregator interface for required features by hybrid-hash-grouper.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_external_gby_fix@1336 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 5be602c..80b3b20 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
 
 public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
 
@@ -169,6 +170,28 @@
                 }
             }
 
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+                // TODO Auto-generated method stub
+                return 0;
+            }
+
+            @Override
+            public int getFieldCount() {
+                // TODO Auto-generated method stub
+                return 0;
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+                    AggregateState state) throws HyracksDataException {
+                // it only works if the output of the aggregator fits in one
+                // frame
+                for (int i = 0; i < pipelines.length; i++) {
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                }
+            }
+
         };
     }
 
@@ -193,10 +216,7 @@
     }
 
     /**
-     * 
-     * 
      * We suppose for now, that each subplan only produces one tuple.
-     * 
      */
     private static class AggregatorOutput implements IFrameWriter {
 
@@ -231,10 +251,8 @@
         }
 
         /**
-         * 
          * Since each pipeline only produces one tuple, this method is only
          * called by the close method of the pipelines.
-         * 
          */
         @Override
         public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index 951376e..5ac7bbe 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -15,6 +15,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
 
 public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
@@ -28,7 +29,9 @@
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
             throws HyracksDataException {
+
         final int[] keys = keyFields;
+        final ArrayTupleBuilder initLengthCalBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         /**
          * one IAggregatorDescriptor instance per Gby operator
@@ -183,6 +186,60 @@
                 }
             }
 
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                initLengthCalBuilder.reset();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        int begin = initLengthCalBuilder.getSize();
+                        if (aggs[i] == null) {
+                            aggs[i] = aggFactories[i].createAggregateFunction();
+                        }
+                        aggs[i].init(initLengthCalBuilder.getDataOutput());
+                        initLengthCalBuilder.addFieldEndOffset();
+                        stateFieldLength[i] = initLengthCalBuilder.getSize() - begin;
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+
+                // doing initial aggregate
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        byte[] data = initLengthCalBuilder.getByteArray();
+                        int prevFieldPos = i + keys.length - 1;
+                        int start = prevFieldPos >= 0 ? initLengthCalBuilder.getFieldEndOffsets()[prevFieldPos] : 0;
+                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                return initLengthCalBuilder.getFieldEndOffsets().length * 4 + initLengthCalBuilder.getSize();
+            }
+
+            @Override
+            public int getFieldCount() {
+                return aggs.length;
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+                    AggregateState state) throws HyracksDataException {
+                ftr.reset(accessor, tIndex);
+                int stateTupleStart = stateAccessor.getTupleStartOffset();
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        byte[] data = stateAccessor.getBuffer().array();
+                        int start = stateAccessor.getFieldStartOffset(i + keys.length) + stateTupleStart;
+                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
         };
     }
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 7a36943..e58e5ef 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
 
 public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
 
@@ -162,6 +163,23 @@
                 }
             }
 
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                return 0;
+            }
+
+            @Override
+            public int getFieldCount() {
+                return aggFactories.length;
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+                    AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
         };
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractImprovedHybridHashGrouper.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractImprovedHybridHashGrouper.java
new file mode 100644
index 0000000..8d8881c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractImprovedHybridHashGrouper.java
@@ -0,0 +1,805 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public abstract class AbstractImprovedHybridHashGrouper implements IFrameAllocator {
+
+    public final int NO_MORE_FREE_BUFFER = -1;
+    private final int END_OF_PARTITION = -1;
+    private final int INVALID_BUFFER = -2;
+    private final int UNALLOCATED_FRAME = -3;
+
+    private final static int INT_SIZE = 4;
+
+    private RunFileWriter[] spilledPartitionWriters;
+
+    private final IHyracksTaskContext ctx;
+
+    private final int framesLimit;
+
+    private final int frameSize;
+
+    private final int[] keys, storedKeys;
+
+    private final ITuplePartitionComputer aggregatePartitionComputer;
+
+    private final IAggregatorDescriptor aggregator, merger;
+
+    private AggregateState[] partitionAggregateStates;
+
+    private AggregateState mergeState;
+
+    private final IFrameWriter finalWriter;
+
+    private final FrameTupleAccessor inputAccessor, partitionAccessor;
+
+    private final TupleInFrameAccessor hashtableEntryAccessor;
+
+    private final FrameTupleAppender appender;
+
+    private ByteBuffer[] frames;
+
+    protected final int numOfPartitions;
+
+    protected final int hashtableSize;
+
+    protected int[] partitionBeginningFrame, partitionCurrentFrame, framesNext, partitionSizeInTuple,
+            partitionSizeInFrame, hashtableEntrySize, framesOffset;
+
+    // hashtable headers
+    private int[] hashtableHeaders;
+
+    private int nextFreeFrameIndex, freeFramesCounter;
+
+    /**
+     * Status of the partition (0: resident partition, 1: spilled partition)
+     */
+    protected BitSet partitionStatus;
+
+    /**
+     * Tuple builder for aggregation
+     */
+    private ArrayTupleBuilder internalTupleBuilderForInsert, internalTupleBuilderForFlush;
+
+    private int hashtableSizeForPartition;
+
+    private ByteBuffer outputBuffer;
+
+    private FrameTupleAccessor mergeAccessor;
+
+    private RecordDescriptor internalRecordDescriptor;
+
+    private ITuplePartitionComputer mergePartitionComputer;
+
+    private final IBinaryComparator[] comparators;
+
+    /*
+     * For instrumenting
+     */
+
+    private int residentPartitionCount = 0, spilledPartitionCount = 0, reloadSpilledPartitionCount = 0;
+
+    private static final Logger LOGGER = Logger.getLogger(AbstractImprovedHybridHashGrouper.class.getSimpleName());
+
+    public AbstractImprovedHybridHashGrouper(IHyracksTaskContext ctx, int framesLimit, int[] keys, int[] storedKeys,
+            int targetNumOfPartitions, int hashtableSize, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
+            IBinaryComparator[] comparators, ITuplePartitionComputer aggregateHashtablePartitionComputer,
+            ITuplePartitionComputer mergeHashtablePartitionComputer, IAggregatorDescriptorFactory aggregatorFactory,
+            IAggregatorDescriptorFactory mergerFactory, IFrameWriter finalOutputWriter) throws HyracksDataException {
+        this.framesLimit = framesLimit;
+        this.keys = keys;
+        this.hashtableSize = hashtableSize;
+        this.storedKeys = storedKeys;
+        this.aggregatePartitionComputer = aggregateHashtablePartitionComputer;
+        this.mergePartitionComputer = mergeHashtablePartitionComputer;
+        this.finalWriter = finalOutputWriter;
+        this.ctx = ctx;
+        this.frameSize = ctx.getFrameSize();
+        this.comparators = comparators;
+
+        // initialize the accessors and appenders
+        this.internalRecordDescriptor = outRecDesc;
+
+        if (keys.length >= outRecDesc.getFields().length) {
+            // for the case of zero-aggregations
+            ISerializerDeserializer<?>[] fields = outRecDesc.getFields();
+            ITypeTraits[] types = outRecDesc.getTypeTraits();
+            ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+            for (int i = 0; i < fields.length; i++)
+                newFields[i] = fields[i];
+            ITypeTraits[] newTypes = null;
+            if (types != null) {
+                newTypes = new ITypeTraits[types.length + 1];
+                for (int i = 0; i < types.length; i++)
+                    newTypes[i] = types[i];
+            }
+            internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+        }
+
+        this.inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+        this.partitionAccessor = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+        this.hashtableEntryAccessor = new TupleInFrameAccessor(internalRecordDescriptor);
+        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.internalTupleBuilderForInsert = new ArrayTupleBuilder(internalRecordDescriptor.getFieldCount());
+        this.internalTupleBuilderForFlush = new ArrayTupleBuilder(internalRecordDescriptor.getFieldCount());
+
+        // initialize the aggregator and merger
+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecDesc, outRecDesc, keys, storedKeys);
+        this.merger = mergerFactory.createAggregator(ctx, outRecDesc, outRecDesc, storedKeys, storedKeys);
+
+        // set the free frames linked list
+        // - free frame starts from beginning
+        nextFreeFrameIndex = 0;
+        // - next links for frames
+        frames = new ByteBuffer[framesLimit];
+        framesNext = new int[framesLimit];
+        framesOffset = new int[framesLimit];
+        for (int i = 0; i < framesNext.length; i++) {
+            framesNext[i] = UNALLOCATED_FRAME;
+            framesOffset[i] = 0;
+        }
+        freeFramesCounter = framesLimit;
+
+        int hashtableHeaderCount = getHeaderPageCount(hashtableSize, ctx.getFrameSize());
+        if (hashtableHeaderCount > framesLimit) {
+            throw new HyracksDataException("Not enough frames to initialize the hash table");
+        }
+
+        this.numOfPartitions = getNumOfPartitions(targetNumOfPartitions, hashtableHeaderCount, ctx.getFrameSize());
+
+        // initialize the aggregate state
+        this.partitionAggregateStates = new AggregateState[numOfPartitions];
+        for (int i = 0; i < numOfPartitions; i++) {
+            partitionAggregateStates[i] = aggregator.createAggregateStates();
+        }
+
+        // initialize the header pages: assign a block of frames for hashtable headers
+        this.hashtableHeaders = new int[hashtableHeaderCount];
+        int headerFrameIndex = allocateFrame();
+        this.hashtableHeaders[0] = headerFrameIndex;
+        for (int i = 1; i < hashtableHeaderCount; i++) {
+            int newHeaderFrameIndex = allocateFrameForOverflowing(headerFrameIndex);
+            this.hashtableHeaders[i] = newHeaderFrameIndex;
+            headerFrameIndex = newHeaderFrameIndex;
+        }
+        for (int i = 0; i < hashtableHeaderCount; i++) {
+            for (int j = 0; j < frameSize; j += INT_SIZE) {
+                frames[hashtableHeaders[i]].putInt(j, -1);
+            }
+        }
+
+        // initialize frame for each partition
+
+        hashtableSizeForPartition = hashtableSize / numOfPartitions;
+
+        partitionStatus = new BitSet(numOfPartitions);
+        partitionBeginningFrame = new int[numOfPartitions];
+        partitionCurrentFrame = new int[numOfPartitions];
+        partitionSizeInTuple = new int[numOfPartitions];
+        partitionSizeInFrame = new int[numOfPartitions];
+        hashtableEntrySize = new int[numOfPartitions];
+        this.spilledPartitionWriters = new RunFileWriter[numOfPartitions];
+        for (int i = 0; i < numOfPartitions; i++) {
+            int newFrame = allocateFrame();
+            if (newFrame == NO_MORE_FREE_BUFFER) {
+                throw new HyracksDataException("Not enough memory for the partition plan");
+            }
+            partitionBeginningFrame[i] = newFrame;
+            partitionCurrentFrame[i] = newFrame;
+            partitionSizeInTuple[i] = 0;
+            partitionSizeInFrame[i] = 1;
+            framesNext[newFrame] = END_OF_PARTITION;
+            hashtableEntrySize[i] = hashtableSizeForPartition;
+        }
+
+        // for output
+        outputBuffer = frames[allocateFrame()];
+        LOGGER.info("Initialized Hybrid-hash-grouper: " + numOfPartitions + " Partitions, " + hashtableSize
+                + " Hashvals, " + hashtableHeaderCount + " Header Pages");
+    }
+
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inputAccessor.reset(buffer);
+        int tupleCount = inputAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            int pid = aggregatePartitionComputer.partition(inputAccessor, i, numOfPartitions);
+            int hid = aggregatePartitionComputer.partition(inputAccessor, i, hashtableSize);
+            insert(inputAccessor, i, pid, hid);
+        }
+    }
+
+    private void insert(FrameTupleAccessor accessor, int tupleIndex, int pid, int hid) throws HyracksDataException {
+        if (!partitionStatus.get(pid)) {
+            insertResidentPartitionRecord(keys, accessor, tupleIndex, pid, hid, aggregator,
+                    partitionAggregateStates[pid], true);
+        } else {
+            // spilled partition
+            // Spilled partition: insert into the partition only, and flush when necessary
+            appender.reset(frames[partitionCurrentFrame[pid]], false);
+            // - build the aggregation value
+            internalTupleBuilderForInsert.reset();
+            for (int k = 0; k < keys.length; k++) {
+                internalTupleBuilderForInsert.addField(accessor, tupleIndex, keys[k]);
+            }
+            aggregator.init(internalTupleBuilderForInsert, accessor, tupleIndex, partitionAggregateStates[pid]);
+            if (!appender.appendSkipEmptyField(internalTupleBuilderForInsert.getFieldEndOffsets(),
+                    internalTupleBuilderForInsert.getByteArray(), 0, internalTupleBuilderForInsert.getSize())) {
+                flushSpilledPartition(pid);
+                partitionSizeInFrame[pid]++;
+                appender.reset(frames[partitionCurrentFrame[pid]], true);
+                if (!appender.appendSkipEmptyField(internalTupleBuilderForInsert.getFieldEndOffsets(),
+                        internalTupleBuilderForInsert.getByteArray(), 0, internalTupleBuilderForInsert.getSize())) {
+                    throw new HyracksDataException("Failed to insert a tuple into a spilled partition.");
+                }
+            }
+
+            partitionSizeInTuple[pid]++;
+        }
+    }
+
+    private void insertResidentPartitionRecord(int[] recKeys, FrameTupleAccessor accessor, int tupleIndex, int pid,
+            int hid, IAggregatorDescriptor grouper, AggregateState groupState, boolean allowSpill)
+            throws HyracksDataException {
+        // resident partition
+        // - find the header record
+        int headerFrameIndex = hid * 2 * INT_SIZE / frameSize;
+        int headerFrameOffset = hid * 2 * INT_SIZE % frameSize;
+        int entryFrameIndex = -1, entryFrameOffset = -1;
+        int pointerFrameIndex = hashtableHeaders[headerFrameIndex], pointerFrameOffset = headerFrameOffset;
+        if (headerFrameIndex >= 0) {
+            entryFrameIndex = frames[pointerFrameIndex].getInt(headerFrameOffset);
+            entryFrameOffset = frames[pointerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+            do {
+                if (entryFrameIndex < 0) {
+                    break;
+                }
+                int actualFrameIndex = getHashtablePartitionFrame(pid, entryFrameIndex);
+                ByteBuffer bufferContainsNextEntry = frames[actualFrameIndex];
+
+                hashtableEntryAccessor.reset(bufferContainsNextEntry, entryFrameOffset);
+                int c = compare(recKeys, accessor, tupleIndex, storedKeys, hashtableEntryAccessor);
+                if (c == 0) {
+                    break;
+                }
+                // move to the next entry along the linked list
+                pointerFrameIndex = actualFrameIndex;
+                pointerFrameOffset = entryFrameOffset + hashtableEntryAccessor.getTupleLength();
+
+                entryFrameIndex = bufferContainsNextEntry.getInt(pointerFrameOffset);
+                entryFrameOffset = bufferContainsNextEntry.getInt(pointerFrameOffset + INT_SIZE);
+            } while (true);
+        }
+        if (entryFrameIndex >= 0) {
+            // Found the entry; do aggregation
+            grouper.aggregate(accessor, tupleIndex, hashtableEntryAccessor, groupState);
+
+        } else {
+            // No matching entry; do insertion
+            int frameIndexToInsert = partitionSizeInFrame[pid] - 1;
+            int frameOffsetToInsert = framesOffset[partitionCurrentFrame[pid]];
+            // check whether there is enough space for the insertion
+            // - build the aggregation value
+            internalTupleBuilderForInsert.reset();
+            for (int k = 0; k < recKeys.length; k++) {
+                internalTupleBuilderForInsert.addField(accessor, tupleIndex, recKeys[k]);
+            }
+
+            int initLength = internalTupleBuilderForInsert.getSize() + grouper.getInitSize(accessor, tupleIndex);
+            int fieldCount = recKeys.length + grouper.getFieldCount();
+
+            if (frameSize - frameOffsetToInsert < fieldCount * INT_SIZE + initLength + INT_SIZE * 2) {
+
+                int newPartitionFrameIndex = allocateFrameForOverflowing(partitionCurrentFrame[pid]);
+                if (newPartitionFrameIndex == NO_MORE_FREE_BUFFER) {
+                    if (allowSpill) {
+                        // need to make some space
+                        int pidToSpill = selectPartitionToSpill();
+                        if (pidToSpill == -1) {
+                            throw new HyracksDataException("Cannot allocate memory for a resident partition.");
+                        }
+                        spillResidentPartition(pidToSpill);
+                        insert(accessor, tupleIndex, pid, hid);
+                        return;
+                    } else {
+                        throw new HyracksDataException("Not enough space for processing a resident partition");
+                    }
+                }
+
+                int currentPartFrame = partitionCurrentFrame[pid];
+                partitionCurrentFrame[pid] = newPartitionFrameIndex;
+                framesNext[newPartitionFrameIndex] = currentPartFrame;
+                framesOffset[newPartitionFrameIndex] = 0;
+                partitionSizeInFrame[pid]++;
+
+                frameIndexToInsert = partitionSizeInFrame[pid] - 1;
+                frameOffsetToInsert = framesOffset[partitionCurrentFrame[pid]];
+                if (frameSize - frameOffsetToInsert < fieldCount * INT_SIZE + initLength + INT_SIZE * 2) {
+                    throw new HyracksDataException("Failed to allocate memory for an aggregation tuple.");
+                }
+            }
+
+            frames[pointerFrameIndex].putInt(pointerFrameOffset, frameIndexToInsert);
+            frames[pointerFrameIndex].putInt(pointerFrameOffset + INT_SIZE, frameOffsetToInsert);
+
+            grouper.init(internalTupleBuilderForInsert, accessor, tupleIndex, groupState);
+
+            for (int i = 0; i < internalTupleBuilderForInsert.getFieldEndOffsets().length; i++) {
+                frames[partitionCurrentFrame[pid]].putInt(frameOffsetToInsert + i * INT_SIZE,
+                        internalTupleBuilderForInsert.getFieldEndOffsets()[i]);
+            }
+            System.arraycopy(internalTupleBuilderForInsert.getByteArray(), 0,
+                    frames[partitionCurrentFrame[pid]].array(),
+                    frameOffsetToInsert + internalTupleBuilderForInsert.getFieldEndOffsets().length * INT_SIZE,
+                    internalTupleBuilderForInsert.getSize());
+
+            int linkedListRefOffset = frameOffsetToInsert + internalTupleBuilderForInsert.getFieldEndOffsets().length
+                    * INT_SIZE + internalTupleBuilderForInsert.getSize();
+
+            frames[partitionCurrentFrame[pid]].putInt(linkedListRefOffset, -1);
+            frames[partitionCurrentFrame[pid]].putInt(linkedListRefOffset + INT_SIZE, -1);
+
+            // update the frame offset
+            framesOffset[partitionCurrentFrame[pid]] += internalTupleBuilderForInsert.getFieldEndOffsets().length
+                    * INT_SIZE + internalTupleBuilderForInsert.getSize() + 2 * INT_SIZE;
+
+            partitionSizeInTuple[pid]++;
+        }
+    }
+
+    public void close() throws HyracksDataException {
+        // flush all resident partitions
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (!partitionStatus.get(i)) {
+                flushResidentPartition(i, finalWriter, aggregator, partitionAggregateStates[i], false);
+                residentPartitionCount++;
+            } else {
+                if (frames[partitionCurrentFrame[i]].getInt(frames[partitionCurrentFrame[i]].capacity() - 4) > 0)
+                    flushSpilledPartition(i);
+                recycleFrame(partitionCurrentFrame[i]);
+                partitionCurrentFrame[i] = UNALLOCATED_FRAME;
+                partitionSizeInFrame[i]++;
+                if (spilledPartitionWriters[i] != null) {
+                    spilledPartitionWriters[i].close();
+                }
+                spilledPartitionCount++;
+            }
+        }
+
+        partitionAggregateStates = null;
+
+        if (this.mergeState == null)
+            this.mergeState = merger.createAggregateStates();
+
+        // clean up the header frames
+        for (int i = 0; i < hashtableHeaders.length; i++) {
+            recycleFrame(hashtableHeaders[i]);
+        }
+
+        hashtableHeaders = null;
+
+        // Select spilled partition to reload
+        RunFileReader runReader;
+        int runFileBufferIndex = allocateFrame();
+        ByteBuffer runFileBuffer = frames[runFileBufferIndex];
+
+        int newHeadPagesCount = getHeaderPageCount(hashtableSize / numOfPartitions, frameSize);
+
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (partitionStatus.get(i)) {
+                mergeState.reset();
+                // check if the partition can be maintained in the main memory
+                if (partitionSizeInFrame[i] > 0 && partitionSizeInFrame[i] + newHeadPagesCount < freeFramesCounter) {
+                    reloadSpilledPartitionCount++;
+                    LOGGER.warning("HybridHashGrouper-Merge " + partitionSizeInFrame[i] + " " + partitionSizeInTuple[i]);
+
+                    partitionCurrentFrame[i] = allocateFrame();
+                    if (partitionCurrentFrame[i] < 0) {
+                        throw new HyracksDataException("Cannot allocate frame for merging.");
+                    }
+                    framesOffset[partitionCurrentFrame[i]] = 0;
+                    // reset the size counter
+                    partitionSizeInFrame[i] = 1;
+                    partitionSizeInTuple[i] = 0;
+
+                    framesNext[partitionCurrentFrame[i]] = END_OF_PARTITION;
+
+                    // initialize the hash table header pages
+                    hashtableHeaders = new int[newHeadPagesCount];
+                    int headerFrameIndex = allocateFrame();
+                    this.hashtableHeaders[0] = headerFrameIndex;
+                    for (int k = 0; k < frameSize; k += INT_SIZE) {
+                        frames[this.hashtableHeaders[0]].putInt(k, -1);
+                    }
+                    for (int j = 1; j < hashtableHeaders.length; j++) {
+                        int newHeaderFrameIndex = allocateFrameForOverflowing(headerFrameIndex);
+                        this.hashtableHeaders[j] = newHeaderFrameIndex;
+                        headerFrameIndex = newHeaderFrameIndex;
+                        for (int k = 0; k < frameSize; k += INT_SIZE) {
+                            frames[newHeaderFrameIndex].putInt(k, -1);
+                        }
+                    }
+
+                    runReader = spilledPartitionWriters[i].createReader();
+                    runReader.open();
+                    if (mergeAccessor == null) {
+                        mergeAccessor = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+                    }
+                    while (runReader.nextFrame(runFileBuffer)) {
+                        mergeAccessor.reset(runFileBuffer);
+                        int tupleCount = mergeAccessor.getTupleCount();
+                        for (int j = 0; j < tupleCount; j++) {
+                            merge(mergeAccessor, j, i);
+                        }
+                    }
+                    // close the run file reader and writer
+                    runReader.close();
+                    spilledPartitionWriters[i].close();
+                    spilledPartitionWriters[i] = null;
+                    // flush the in-memory hash table into the output
+                    flushResidentPartition(i, finalWriter, merger, mergeState, false);
+                }
+            }
+        }
+        LOGGER.warning("HybridHashGrouper " + residentPartitionCount + " " + spilledPartitionCount + " "
+                + reloadSpilledPartitionCount);
+    }
+
+    private void merge(FrameTupleAccessor accessor, int tupleIndex, int pid) throws HyracksDataException {
+        int hid = mergePartitionComputer.partition(accessor, tupleIndex, hashtableEntrySize[pid]);
+        insertResidentPartitionRecord(storedKeys, accessor, tupleIndex, pid, hid, merger, mergeState, false);
+    }
+
+    protected void spillResidentPartition(int pid) throws HyracksDataException {
+        // flush the data partition
+        RunFileWriter writer = spilledPartitionWriters[pid];
+        if (writer == null) {
+            writer = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+                    AbstractImprovedHybridHashGrouper.class.getSimpleName()), ctx.getIOManager());
+            writer.open();
+            spilledPartitionWriters[pid] = writer;
+        }
+        flushResidentPartition(pid, writer, aggregator, partitionAggregateStates[pid], true);
+        partitionStatus.set(pid);
+    }
+
+    protected void flushSpilledPartition(int pid) throws HyracksDataException {
+        RunFileWriter writer = spilledPartitionWriters[pid];
+        if (writer == null) {
+            writer = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+                    AbstractImprovedHybridHashGrouper.class.getName()), ctx.getIOManager());
+            writer.open();
+            spilledPartitionWriters[pid] = writer;
+        }
+        flushSpilledPartitionToWriter(pid, writer);
+    }
+
+    private void flushSpilledPartitionToWriter(int pid, IFrameWriter writer) throws HyracksDataException {
+        int spillPartFrame = partitionCurrentFrame[pid];
+        int firstPartFrame = spillPartFrame;
+        appender.reset(outputBuffer, true);
+        while (spillPartFrame != END_OF_PARTITION) {
+            partitionAccessor.reset(frames[spillPartFrame]);
+            int tupleCount = partitionAccessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                internalTupleBuilderForFlush.reset();
+                for (int k = 0; k < storedKeys.length; k++) {
+                    internalTupleBuilderForFlush.addField(partitionAccessor, i, storedKeys[k]);
+                }
+                aggregator.outputPartialResult(internalTupleBuilderForFlush, partitionAccessor, i,
+                        partitionAggregateStates[pid]);
+                if (!appender.appendSkipEmptyField(internalTupleBuilderForFlush.getFieldEndOffsets(),
+                        internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+                    FrameUtils.flushFrame(outputBuffer, writer);
+                    appender.reset(outputBuffer, true);
+                    if (!appender.appendSkipEmptyField(internalTupleBuilderForFlush.getFieldEndOffsets(),
+                            internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+                        throw new HyracksDataException("Failed to flush a spilled partition.");
+                    }
+                }
+            }
+
+            framesOffset[spillPartFrame] = 0;
+            int nextFrame = framesNext[spillPartFrame];
+            if (spillPartFrame != firstPartFrame) {
+                recycleFrame(spillPartFrame);
+            }
+            spillPartFrame = nextFrame;
+        }
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(outputBuffer, writer);
+        }
+        frames[firstPartFrame].clear();
+        frames[firstPartFrame].putInt(frames[firstPartFrame].capacity() - 4, 0);
+        framesNext[firstPartFrame] = END_OF_PARTITION;
+        // reset the aggregate state
+        partitionAggregateStates[pid].reset();
+    }
+
+    private void flushResidentPartition(int pid, IFrameWriter writer, IAggregatorDescriptor grouper,
+            AggregateState groupState, boolean isPartial) throws HyracksDataException {
+        int residentPartFrame = partitionCurrentFrame[pid];
+        int firstPartFrame = residentPartFrame;
+        appender.reset(outputBuffer, true);
+        while (residentPartFrame != END_OF_PARTITION) {
+            int frameOffset = 0;
+
+            while (frameOffset < framesOffset[residentPartFrame]) {
+                hashtableEntryAccessor.reset(frames[residentPartFrame], frameOffset);
+                int tupleLength = hashtableEntryAccessor.getTupleLength();
+                int fieldOffsetLength = hashtableEntryAccessor.getFieldCount() * INT_SIZE;
+                internalTupleBuilderForFlush.reset();
+
+                for (int k = 0; k < storedKeys.length; k++) {
+                    internalTupleBuilderForFlush.addField(frames[residentPartFrame].array(), frameOffset
+                            + fieldOffsetLength + hashtableEntryAccessor.getFieldStartOffset(k),
+                            hashtableEntryAccessor.getFieldLength(k));
+                }
+
+                if (isPartial)
+                    grouper.outputPartialResult(internalTupleBuilderForFlush, frames[residentPartFrame].array(),
+                            frameOffset, tupleLength, hashtableEntryAccessor.getFieldCount(), INT_SIZE, groupState);
+                else
+                    grouper.outputFinalResult(internalTupleBuilderForFlush, frames[residentPartFrame].array(),
+                            frameOffset, tupleLength, hashtableEntryAccessor.getFieldCount(), INT_SIZE, groupState);
+
+                if (!appender.append(internalTupleBuilderForFlush.getFieldEndOffsets(),
+                        internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+                    FrameUtils.flushFrame(outputBuffer, writer);
+                    appender.reset(outputBuffer, true);
+                    if (!appender.append(internalTupleBuilderForFlush.getFieldEndOffsets(),
+                            internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+                        throw new HyracksDataException("Failed to flush a spilled partition.");
+                    }
+                }
+
+                frameOffset += tupleLength + INT_SIZE * 2;
+            }
+
+            // release the frame
+            framesOffset[residentPartFrame] = 0;
+            int nextFrame = framesNext[residentPartFrame];
+            if (residentPartFrame != firstPartFrame) {
+                recycleFrame(residentPartFrame);
+            }
+            residentPartFrame = nextFrame;
+        }
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(outputBuffer, writer);
+        }
+        frames[firstPartFrame].clear();
+        frames[firstPartFrame].putInt(frames[firstPartFrame].capacity() - 4, 0);
+        framesNext[firstPartFrame] = END_OF_PARTITION;
+    }
+
+    /**
+     * Get the number of header pages for the built-in hashtable.
+     * <p/>
+     * The number of header pages for the hashtable is calculated by dividing the total memory usage of all hashtable
+     * keys over the frame size. Each hashtable key takes two integer-length fields, indicating the frame index and the
+     * tuple index of the first entry with this hash key value.
+     * <p/>
+     * 
+     * @param hashtableSize
+     * @param frameSize
+     * @return
+     */
+    private int getHeaderPageCount(int hashtableSize, int frameSize) {
+        int hashtableHeaderPageResidual = (hashtableSize % frameSize) * (INT_SIZE * 2) % frameSize == 0 ? 0 : 1;
+        return hashtableSize / frameSize * (INT_SIZE * 2) + hashtableHeaderPageResidual;
+    }
+
+    /**
+     * Get the number of partitions.
+     * <p/>
+     * The number of partition is decided by the target number of partitions, and also the size of the hashtable. The
+     * goal is to contain the whole hash table header, and for each partition at least one page can be assigned.
+     * <p/>
+     * In case that the target number of partitions is too large, or the available frame size is too small, the number
+     * of partition will be adjusted to be the available frames except for the headers and other reversed frames.
+     * <p/>
+     * 
+     * @param targetNumOfPartitions
+     * @param hashtableHeaderPageCount
+     * @param frameSize
+     * @return
+     */
+    private int getNumOfPartitions(int targetNumOfPartitions, int hashtableHeaderPageCount, int frameSize) {
+        if (targetNumOfPartitions + hashtableHeaderPageCount + getReservedFramesCount() > framesLimit) {
+            return framesLimit - getReservedFramesCount() - hashtableHeaderPageCount;
+        } else {
+            return targetNumOfPartitions;
+        }
+    }
+
+    private void recycleFrame(int frameIdx) {
+        frames[frameIdx].clear();
+        framesOffset[frameIdx] = 0;
+        frames[frameIdx].putInt(frames[frameIdx].capacity() - 4, 0);
+        int oldFreeHead = nextFreeFrameIndex;
+        nextFreeFrameIndex = frameIdx;
+        framesNext[nextFreeFrameIndex] = oldFreeHead;
+        freeFramesCounter++;
+    }
+
+    /**
+     * Allocate frame in case of the given overflowing frame. The allocated frame
+     * will become the head of the linked list, and its next pointer points to the
+     * overflowing frame.
+     */
+    @Override
+    public int allocateFrameForOverflowing(int frameIndex) throws HyracksDataException {
+        int newFrameIndex = allocateFrame();
+        if (newFrameIndex < 0)
+            return newFrameIndex;
+        framesNext[newFrameIndex] = frameIndex;
+        return newFrameIndex;
+    }
+
+    /**
+     * Allocate a free frame.
+     * <p/>
+     * If {@link #nextFreeFrameIndex} points to an unallocated frame slot, the frame is initialized and returned. Then
+     * the next frame of the free frame replaces the first free page {@link #nextFreeFrameIndex}.
+     * <p/>
+     * When there is no more free frame available, {@link #nextFreeFrameIndex} will point to
+     * {@link #NO_MORE_FREE_BUFFER}.
+     * <p/>
+     * 
+     * @return
+     * @throws HyracksDataException
+     */
+    @Override
+    public int allocateFrame() throws HyracksDataException {
+        if (nextFreeFrameIndex != NO_MORE_FREE_BUFFER) {
+            int freeFrameIndex = nextFreeFrameIndex;
+            if (frames[freeFrameIndex] == null) {
+                frames[freeFrameIndex] = ctx.allocateFrame();
+            }
+            int oldNext = framesNext[freeFrameIndex];
+            framesNext[freeFrameIndex] = INVALID_BUFFER;
+            if (oldNext == UNALLOCATED_FRAME) {
+                nextFreeFrameIndex++;
+                if (nextFreeFrameIndex == framesLimit) { // No more free buffer
+                    nextFreeFrameIndex = NO_MORE_FREE_BUFFER;
+                }
+            } else {
+                nextFreeFrameIndex = oldNext;
+            }
+            (frames[freeFrameIndex]).clear();
+
+            freeFramesCounter--;
+            if (freeFramesCounter < 0) {
+                throw new HyracksDataException("Memory underflow!");
+            }
+            return freeFrameIndex;
+        } else {
+            return NO_MORE_FREE_BUFFER; // A partitions needs to be spilled (if feasible)
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IFrameAllocator#getFrame(int)
+     */
+    @Override
+    public byte[] getFrame(int frameIndex) throws HyracksDataException {
+        if (frameIndex < 0 || frameIndex > frames.length) {
+            return null;
+        }
+        return frames[frameIndex].array();
+    }
+
+    private int compare(int[] keys0, FrameTupleAccessor accessor0, int tIndex0, int[] keys1,
+            TupleInFrameAccessor accessor1) {
+        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = accessor1.getTupleStartOffset();
+        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < keys0.length; ++i) {
+            int fIdx0 = keys0[i];
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fIdx1 = keys1[i];
+            int fStart1 = accessor1.getFieldStartOffset(fIdx1);
+            int fEnd1 = accessor1.getFieldEndOffset(fIdx1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    private int getHashtablePartitionFrame(int pid, int frameIndex) {
+        if (frameIndex > partitionSizeInFrame[pid] - 1) {
+            return -1;
+        }
+        int fIdx = partitionCurrentFrame[pid];
+        for (int i = partitionSizeInFrame[pid] - 1; i > frameIndex; i--) {
+            fIdx = framesNext[fIdx];
+        }
+        return fIdx;
+    }
+
+    public RunFileWriter getSpilledRunWriter(int pid) {
+        if (pid >= 0 && pid < numOfPartitions) {
+            return spilledPartitionWriters[pid];
+        }
+        return null;
+    }
+
+    public int getPartitionSizeInFrame(int pid) {
+        if (pid >= 0 && pid < numOfPartitions) {
+            return partitionSizeInFrame[pid];
+        }
+        return -1;
+    }
+
+    public boolean hasSpilledPartition() {
+        boolean hasSpilledPart = false;
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (spilledPartitionWriters[i] != null) {
+                hasSpilledPart = true;
+                break;
+            }
+        }
+        return hasSpilledPart;
+    }
+
+    public int getPartitionMaxFrameSize() {
+        int maxSize = 0;
+        for (int i = partitionStatus.nextSetBit(0); i >= 0; i = partitionStatus.nextSetBit(i + 1)) {
+            if (partitionSizeInFrame[i] > maxSize) {
+                maxSize = partitionSizeInFrame[i];
+            }
+        }
+        return maxSize;
+    }
+
+    public int getNumOfPartitions() {
+        return numOfPartitions;
+    }
+
+    abstract public int getReservedFramesCount();
+
+    abstract public int selectPartitionToSpill();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperAggregate.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperAggregate.java
new file mode 100644
index 0000000..6897505
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperAggregate.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class ExternalHashGrouperAggregate {
+
+    private static final Logger LOGGER = Logger.getLogger(ExternalHashGrouperAggregate.class.getName());
+
+    private final int[] keyFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+
+    private final int framesLimit;
+    private final ISpillableTableFactory spillableTableFactory;
+    private final boolean isOutputSorted;
+
+    private final RecordDescriptor inRecDesc, outRecDesc;
+
+    private final IHyracksTaskContext ctx;
+
+    private final boolean skipSort;
+
+    private final boolean skipRun;
+
+    private final int tableSize;
+
+    /*
+     * For aggregate phase
+     */
+
+    LinkedList<RunFileReader> runs;
+    ISpillableTable gTable;
+    FrameTupleAccessor inFrameAccessor;
+
+    /*
+     * For instrumenting
+     */
+    long ioCounter = 0;
+    long sortTimeCounter = 0;
+    long aggTimeStart = 0;
+    long frameFlushRequest = 0;
+    long frameProcessingTime = 0;
+    long frameFlushTime = 0;
+
+    /*
+     * For output
+     */
+    private final IFrameWriter finalWriter;
+
+    /**
+     * @param ctx
+     * @param keyFields
+     * @param framesLimit
+     * @param tableSize
+     * @param comparatorFactories
+     * @param firstNormalizerFactory
+     * @param aggregatorFactory
+     * @param mergerFactory
+     * @param inRecDesc
+     * @param outRecDesc
+     * @param spillableTableFactory
+     * @param isOutputSorted
+     */
+    public ExternalHashGrouperAggregate(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
+            ISpillableTableFactory spillableTableFactory, IFrameWriter finalWriter, boolean isOutputSorted) {
+        this.keyFields = keyFields;
+        this.comparatorFactories = comparatorFactories;
+        this.firstNormalizerFactory = firstNormalizerFactory;
+        this.aggregatorFactory = aggregatorFactory;
+        this.framesLimit = framesLimit;
+        this.spillableTableFactory = spillableTableFactory;
+        this.isOutputSorted = isOutputSorted;
+        this.tableSize = tableSize;
+        this.inRecDesc = inRecDesc;
+        this.outRecDesc = outRecDesc;
+        this.ctx = ctx;
+        this.finalWriter = finalWriter;
+
+        this.skipSort = false;
+        this.skipRun = false;
+    }
+
+    public void initGrouperForAggregate() throws HyracksDataException {
+        runs = new LinkedList<RunFileReader>();
+        gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories, firstNormalizerFactory,
+                aggregatorFactory, inRecDesc, outRecDesc, tableSize, framesLimit);
+        gTable.reset();
+        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+    }
+
+    public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
+        inFrameAccessor.reset(buffer);
+        int tupleCount = inFrameAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            if (!gTable.insert(inFrameAccessor, i)) {
+                flushFramesToRun();
+            }
+        }
+    }
+
+    private void flushFramesToRun() throws HyracksDataException {
+        frameFlushRequest++;
+        long frameFlushTimer = System.currentTimeMillis();
+
+        if (!skipSort) {
+            long sortStart = System.currentTimeMillis();
+            gTable.sortFrames();
+            sortTimeCounter += System.currentTimeMillis() - sortStart;
+        }
+
+        if (!skipRun) {
+            FileReference runFile;
+            try {
+                runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+                        ExternalGroupOperatorDescriptor.class.getSimpleName());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+            writer.open();
+            try {
+                gTable.flushFrames(writer, true);
+            } catch (Exception ex) {
+                throw new HyracksDataException(ex);
+            } finally {
+                writer.close();
+                ioCounter += writer.getFileSize();
+            }
+            runs.add(((RunFileWriter) writer).createReader());
+        }
+        gTable.reset();
+        frameFlushTime = System.currentTimeMillis() - frameFlushTimer;
+    }
+
+    public void finishAggregation() throws HyracksDataException {
+        gTable.finishup(isOutputSorted);
+        if (gTable.getFrameCount() >= 0 && runs.size() > 0) {
+            flushFramesToRun();
+            gTable.close();
+            gTable = null;
+        } else if (gTable.getFrameCount() >= 0 && runs.size() <= 0) {
+            // Write the output
+            gTable.flushFrames(finalWriter, false);
+        }
+        LOGGER.warning("[C]Hybrid AggregateActivity - RunIO    " + ioCounter);
+        LOGGER.warning("[C]Hybrid AggregateActivity - FlushFrames    " + frameFlushRequest);
+        LOGGER.warning("[T]Hybrid AggregateActivity - Sort    " + sortTimeCounter);
+        LOGGER.warning("[T]Hybrid AggregateActivity - ProcessFrames    " + frameProcessingTime);
+        LOGGER.warning("[T]Hybrid AggregateActivity - FlushFrames    " + frameFlushTime);
+        LOGGER.warning("[T]Hybrid AggregateActivity    " + (System.currentTimeMillis() - aggTimeStart));
+    }
+
+    public LinkedList<RunFileReader> getRunReaders() {
+        return runs;
+    }
+
+    public ISpillableTable getGroupTable() {
+        return gTable;
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperMerge.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperMerge.java
new file mode 100644
index 0000000..5cdec97
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperMerge.java
@@ -0,0 +1,461 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class ExternalHashGrouperMerge {
+
+    private static final Logger LOGGER = Logger.getLogger(ExternalHashGrouperMerge.class.getName());
+
+    private final int[] keyFields;
+    private final IBinaryComparator[] comparators;
+
+    private final IAggregatorDescriptor aggregator;
+    private final AggregateState aggregateState;
+
+    private final int framesLimit;
+
+    private final boolean isOutputSorted;
+
+    private final RecordDescriptor inRecDesc;
+
+    private final IHyracksTaskContext ctx;
+
+    private final ArrayTupleBuilder tupleBuilder;
+
+    private final IFrameWriter outputWriter;
+
+    List<ByteBuffer> inFrames;
+    ByteBuffer outFrame, writerFrame;
+    FrameTupleAppender outAppender, writerAppender;
+    LinkedList<RunFileReader> runs;
+    ArrayTupleBuilder finalTupleBuilder;
+    FrameTupleAccessor outFrameAccessor;
+    int[] currentFrameIndexInRun, currentRunFrames;
+    int runFrameLimit = 1;
+
+    // For instrumenting
+    long ioCounter;
+
+    public ExternalHashGrouperMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, boolean isOutputSorted, IFrameWriter outputWriter)
+            throws HyracksDataException {
+        this.ctx = ctx;
+        this.framesLimit = framesLimit;
+
+        this.keyFields = keyFields;
+        this.comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparators.length; i++) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        aggregator = aggregatorFactory.createAggregator(ctx, inRecDesc, outRecDesc, keyFields, keyFields);
+        aggregateState = aggregator.createAggregateStates();
+
+        this.inRecDesc = inRecDesc;
+        this.isOutputSorted = isOutputSorted;
+
+        this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
+
+        this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
+
+        this.outputWriter = outputWriter;
+
+        this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+    }
+
+    public void initialize(ISpillableTable gTable, LinkedList<RunFileReader> runFiles) throws HyracksDataException {
+        long mergeTimer = System.currentTimeMillis();
+        runs = runFiles;
+        outputWriter.open();
+        try {
+            if (runs.size() <= 0) {
+                if (gTable != null) {
+                    if (isOutputSorted)
+                        gTable.sortFrames();
+                    gTable.flushFrames(outputWriter, false);
+                    gTable.close();
+                    gTable = null;
+                }
+            } else {
+                LOGGER.warning("[C]Hybrid MergeActivity - Runs    " + runs.size());
+                runs = new LinkedList<RunFileReader>(runs);
+                inFrames = new ArrayList<ByteBuffer>();
+                outFrame = ctx.allocateFrame();
+                outAppender.reset(outFrame, true);
+                outFrameAccessor.reset(outFrame);
+                while (runs.size() > 0) {
+                    try {
+                        doPass(runs);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                inFrames.clear();
+            }
+        } catch (Exception e) {
+            outputWriter.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            aggregateState.close();
+            LOGGER.warning("[C]Hybrid MergeActivity - RunIO    " + ioCounter);
+            LOGGER.warning("[T]Hybrid MergeActivity    " + (System.currentTimeMillis() - mergeTimer));
+        }
+    }
+
+    private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+        long mergePassTimer = System.currentTimeMillis();
+        long processedRecords = 0;
+        long groupCreated = 0;
+        FileReference newRun = null;
+        IFrameWriter writer = outputWriter;
+        boolean finalPass = false;
+
+        while (inFrames.size() + 2 < framesLimit) {
+            inFrames.add(ctx.allocateFrame());
+        }
+        int runNumber;
+        if (runs.size() + 2 <= framesLimit) {
+            finalPass = true;
+            runFrameLimit = (framesLimit - 2) / runs.size();
+            runNumber = runs.size();
+        } else {
+            runNumber = framesLimit - 2;
+            newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+                    ExternalGroupOperatorDescriptor.class.getSimpleName());
+            writer = new RunFileWriter(newRun, ctx.getIOManager());
+            writer.open();
+        }
+        try {
+            currentFrameIndexInRun = new int[runNumber];
+            currentRunFrames = new int[runNumber];
+            /**
+             * Create file readers for each input run file, only for
+             * the ones fit into the inFrames
+             */
+            RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+            FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+            Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+            ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), inRecDesc, runNumber,
+                    comparator);
+            /**
+             * current tuple index in each run
+             */
+            int[] tupleIndices = new int[runNumber];
+
+            for (int i = 0; i < runNumber; i++) {
+                int runIndex = topTuples.peek().getRunid();
+                tupleIndices[runIndex] = 0;
+                // Load the run file
+                runFileReaders[runIndex] = runs.get(runIndex);
+                runFileReaders[runIndex].open();
+
+                currentRunFrames[runIndex] = 0;
+                currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+                for (int j = 0; j < runFrameLimit; j++) {
+                    int frameIndex = currentFrameIndexInRun[runIndex] + j;
+                    if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+                        tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+                        tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                        currentRunFrames[runIndex]++;
+                        if (j == 0)
+                            setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            /**
+             * Start merging
+             */
+            while (!topTuples.areRunsExhausted()) {
+                processedRecords++;
+                /**
+                 * Get the top record
+                 */
+                ReferenceEntry top = topTuples.peek();
+                int tupleIndex = top.getTupleIndex();
+                int runIndex = topTuples.peek().getRunid();
+
+                FrameTupleAccessor fta = top.getAccessor();
+
+                int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+                if (currentTupleInOutFrame < 0
+                        || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+
+                    groupCreated++;
+
+                    /**
+                     * Initialize the first output record Reset the
+                     * tuple builder
+                     */
+
+                    tupleBuilder.reset();
+
+                    for (int k = 0; k < keyFields.length; k++) {
+                        tupleBuilder.addField(fta, tupleIndex, keyFields[k]);
+                    }
+
+                    aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
+
+                    if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                        flushOutFrame(writer, finalPass);
+                        if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                                tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                            throw new HyracksDataException(
+                                    "The partial result is too large to be initialized in a frame.");
+                        }
+                    }
+
+                } else {
+                    /**
+                     * if new tuple is in the same group of the
+                     * current aggregator do merge and output to the
+                     * outFrame
+                     */
+
+                    aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, aggregateState);
+
+                }
+                tupleIndices[runIndex]++;
+                setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+            }
+
+            if (outAppender.getTupleCount() > 0) {
+                flushOutFrame(writer, finalPass);
+                outAppender.reset(outFrame, true);
+            }
+
+            aggregator.close();
+
+            runs.subList(0, runNumber).clear();
+            /**
+             * insert the new run file into the beginning of the run
+             * file list
+             */
+            if (!finalPass) {
+                runs.add(0, ((RunFileWriter) writer).createReader());
+                ioCounter += ((RunFileWriter) writer).getFileSize();
+            }
+        } finally {
+            if (!finalPass) {
+                writer.close();
+            }
+        }
+        LOGGER.warning("[TCC]Hybrid - Merge - doPass - " + (System.currentTimeMillis() - mergePassTimer) + " - "
+                + processedRecords + " - " + groupCreated);
+    }
+
+    private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+
+        if (finalTupleBuilder == null) {
+            finalTupleBuilder = new ArrayTupleBuilder(inRecDesc.getFields().length);
+        }
+
+        if (writerFrame == null) {
+            writerFrame = ctx.allocateFrame();
+        }
+
+        if (writerAppender == null) {
+            writerAppender = new FrameTupleAppender(ctx.getFrameSize()); 
+        }
+        writerAppender.reset(writerFrame, true);
+
+        outFrameAccessor.reset(outFrame);
+
+        for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+
+            finalTupleBuilder.reset();
+
+            for (int k = 0; k < keyFields.length; k++) {
+                finalTupleBuilder.addField(outFrameAccessor, i, keyFields[k]);
+            }
+
+            if (isFinal) {
+
+                aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+
+            } else {
+
+                aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+            }
+
+            if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                FrameUtils.flushFrame(writerFrame, writer);
+                writerAppender.reset(writerFrame, true);
+                if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                        finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                    throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
+                }
+            }
+        }
+        if (writerAppender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(writerFrame, writer);
+            writerAppender.reset(writerFrame, true);
+        }
+
+        outAppender.reset(outFrame, true);
+    }
+
+    private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+        int runStart = runIndex * runFrameLimit;
+        boolean existNext = false;
+        if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+            /**
+             * run already closed
+             */
+            existNext = false;
+        } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+            /**
+             * not the last frame for this run
+             */
+            existNext = true;
+            if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+                tupleIndices[runIndex] = 0;
+                currentFrameIndexInRun[runIndex]++;
+            }
+        } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+            /**
+             * the last frame has expired
+             */
+            existNext = true;
+        } else {
+            /**
+             * If all tuples in the targeting frame have been
+             * checked.
+             */
+            tupleIndices[runIndex] = 0;
+            currentFrameIndexInRun[runIndex] = runStart;
+            /**
+             * read in batch
+             */
+            currentRunFrames[runIndex] = 0;
+            for (int j = 0; j < runFrameLimit; j++) {
+                int frameIndex = currentFrameIndexInRun[runIndex] + j;
+                if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
+                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                    existNext = true;
+                    currentRunFrames[runIndex]++;
+                } else {
+                    break;
+                }
+            }
+        }
+
+        if (existNext) {
+            topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
+        } else {
+            topTuples.pop();
+            closeRun(runIndex, runCursors, tupleAccessors);
+        }
+    }
+
+    /**
+     * Close the run file, and also the corresponding readers and
+     * input frame.
+     * 
+     * @param index
+     * @param runCursors
+     * @param tupleAccessor
+     * @throws HyracksDataException
+     */
+    private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+            throws HyracksDataException {
+        if (runCursors[index] != null) {
+            runCursors[index].close();
+            runCursors[index] = null;
+            int frameOffset = index * runFrameLimit;
+            for (int j = 0; j < runFrameLimit; j++) {
+                tupleAccessor[frameOffset + j] = null;
+            }
+        }
+    }
+
+    private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+        byte[] b1 = fta1.getBuffer().array();
+        byte[] b2 = fta2.getBuffer().array();
+        for (int f = 0; f < keyFields.length; ++f) {
+            int fIdx = f;
+            int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+            int l1 = fta1.getFieldLength(j1, fIdx);
+            int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+            int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+            int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+            int l2 = l2_end - l2_start;
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+        return new Comparator<ReferenceEntry>() {
+
+            @Override
+            public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+                FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+                FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+                int j1 = o1.getTupleIndex();
+                int j2 = o2.getTupleIndex();
+                byte[] b1 = fta1.getBuffer().array();
+                byte[] b2 = fta2.getBuffer().array();
+                for (int f = 0; f < keyFields.length; ++f) {
+                    int fIdx = f;
+                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                            + fta1.getFieldStartOffset(j1, fIdx);
+                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                            + fta2.getFieldStartOffset(j2, fIdx);
+                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                    if (c != 0) {
+                        return c;
+                    }
+                }
+                return 0;
+            }
+
+        };
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 87bc534..b59db51 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -109,5 +109,12 @@
             int fieldCount, int fieldSlotLength, AggregateState state) throws HyracksDataException;
 
     public void close();
+    
+    public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+    
+    public int getFieldCount();
+    
+    public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+            AggregateState state) throws HyracksDataException;
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index a34fa73..2c73136 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -116,5 +116,7 @@
      * Close the field aggregator
      */
     public void close();
+    
+    public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFrameAllocator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFrameAllocator.java
new file mode 100644
index 0000000..b952c40
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFrameAllocator.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameAllocator {
+
+    public int allocateFrameForOverflowing(int frameIndex) throws HyracksDataException;
+
+    public int allocateFrame() throws HyracksDataException;
+
+    public byte[] getFrame(int frameIndex) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IImprovedHybridHashGroupFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IImprovedHybridHashGroupFactory.java
new file mode 100644
index 0000000..39cd0e8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IImprovedHybridHashGroupFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IImprovedHybridHashGroupFactory extends Serializable {
+
+    public AbstractImprovedHybridHashGrouper createHybridHashGrouper(IHyracksTaskContext ctx, int framesLimit,
+            int[] keys, int[] storedKeys, int targetNumOfPartitions, int hashtableSize, RecordDescriptor inRecDesc,
+            RecordDescriptor outRecDesc, IBinaryComparator[] comparators,
+            ITuplePartitionComputer aggregateHashtablePartitionComputer,
+            ITuplePartitionComputer mergeHashtablePartitionComputer, IAggregatorDescriptorFactory aggregatorFactory,
+            IAggregatorDescriptorFactory mergerFactory, IFrameWriter finalOutputWriter) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupFactory.java
new file mode 100644
index 0000000..d29ce05
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ImprovedHybridHashGroupFactory implements IImprovedHybridHashGroupFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IImprovedHybridHashGroupFactory#createHybridHashGrouper(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, int, int[], int[], int, int, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator[], edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer, edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer, edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory, edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory, edu.uci.ics.hyracks.api.comm.IFrameWriter)
+     */
+    @Override
+    public AbstractImprovedHybridHashGrouper createHybridHashGrouper(IHyracksTaskContext ctx, int framesLimit,
+            int[] keys, int[] storedKeys, int targetNumOfPartitions, int hashtableSize, RecordDescriptor inRecDesc,
+            RecordDescriptor outRecDesc, IBinaryComparator[] comparators,
+            ITuplePartitionComputer aggregateHashtablePartitionComputer,
+            ITuplePartitionComputer mergeHashtablePartitionComputer, IAggregatorDescriptorFactory aggregatorFactory,
+            IAggregatorDescriptorFactory mergerFactory, IFrameWriter finalOutputWriter) throws HyracksDataException {
+        return new AbstractImprovedHybridHashGrouper(ctx, framesLimit, keys, storedKeys, targetNumOfPartitions,
+                hashtableSize, inRecDesc, outRecDesc, comparators, aggregateHashtablePartitionComputer,
+                mergeHashtablePartitionComputer, aggregatorFactory, mergerFactory, finalOutputWriter) {
+
+            @Override
+            public int getReservedFramesCount() {
+                // use one frame for output buffer
+                return 1;
+            }
+
+            /**
+             * Pick the resident partition with largest size (in frame).
+             */
+            @Override
+            public int selectPartitionToSpill() {
+                int pid = -1;
+                for (int i = 0; i < numOfPartitions; i++) {
+                    if (!partitionStatus.get(i)) {
+                        // only pick resident partition
+                        if (pid < 0) {
+                            pid = i;
+                        } else {
+                            if (partitionSizeInFrame[i] > partitionSizeInFrame[pid])
+                                pid = i;
+                        }
+                    }
+                }
+                return pid;
+            }
+        };
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupOperatorDescriptor.java
new file mode 100644
index 0000000..294366d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupOperatorDescriptor.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class ImprovedHybridHashGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private final int framesLimit;
+    private final int inputSize;
+    private final double factor;
+    private final int[] keys;
+    private final int hashtableSize;
+
+    private final IBinaryHashFunctionFamily[] hashFunctionFamilies;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+
+    private final IImprovedHybridHashGroupFactory hybridHashGrouperFactory;
+
+    private final IAggregatorDescriptorFactory aggregatorFactory, mergerFactory;
+
+    private final static Logger LOGGER = Logger.getLogger(ImprovedHybridHashGroupOperatorDescriptor.class
+            .getSimpleName());
+
+    private static final double HYBRID_SWITCH_THRESHOLD = 0.8;
+
+    public ImprovedHybridHashGroupOperatorDescriptor(JobSpecification spec, int framesLimit, int inputSize,
+            double factor, int hashtableSize, int[] keys, IBinaryHashFunctionFamily[] hashFunctionFamilies,
+            IBinaryComparatorFactory[] comparatorFactories, IImprovedHybridHashGroupFactory hybridHashGrouperFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+            RecordDescriptor outRecordDescriptor) {
+        super(spec, 1, 1);
+        this.inputSize = inputSize;
+        this.framesLimit = framesLimit;
+        this.factor = factor;
+        this.keys = keys;
+        this.hashtableSize = hashtableSize;
+
+        this.hashFunctionFamilies = hashFunctionFamilies;
+        this.comparatorFactories = comparatorFactories;
+
+        this.hybridHashGrouperFactory = hybridHashGrouperFactory;
+
+        this.aggregatorFactory = aggregatorFactory;
+        this.mergerFactory = mergerFactory;
+
+        recordDescriptors[0] = outRecordDescriptor;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.api.dataflow.IActivity#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
+     */
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions)
+            throws HyracksDataException {
+
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+
+        final RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+
+        final ITuplePartitionComputer aggregateTpc = new FieldHashPartitionComputerFamily(keys, hashFunctionFamilies)
+                .createPartitioner(0);
+
+        // Keys for stored records
+        final int[] storedKeys = new int[keys.length];
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keys.length];
+        for (int i = 0; i < keys.length; i++) {
+            storedKeys[i] = i;
+            storedKeySerDeser[i] = inRecDesc.getFields()[keys[i]];
+        }
+
+        final ITuplePartitionComputer mergeTpc = new FieldHashPartitionComputerFamily(storedKeys, hashFunctionFamilies)
+                .createPartitioner(0);
+
+        final RecordDescriptor internalRecordDescriptor;
+
+        if (keys.length >= recordDescriptors[0].getFields().length) {
+            // for the case of zero-aggregations
+            ISerializerDeserializer<?>[] fields = recordDescriptors[0].getFields();
+            ITypeTraits[] types = recordDescriptors[0].getTypeTraits();
+            ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+            for (int i = 0; i < fields.length; i++)
+                newFields[i] = fields[i];
+            ITypeTraits[] newTypes = null;
+            if (types != null) {
+                newTypes = new ITypeTraits[types.length + 1];
+                for (int i = 0; i < types.length; i++)
+                    newTypes[i] = types[i];
+            }
+            internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+        } else {
+            internalRecordDescriptor = recordDescriptors[0];
+        }
+
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+            private AbstractImprovedHybridHashGrouper hybridHashGrouper;
+
+            private int numOfPartitions;
+
+            private ByteBuffer runLoadFrame;
+
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+                hybridHashGrouper = hybridHashGrouperFactory.createHybridHashGrouper(ctx, framesLimit, keys,
+                        storedKeys, getNumberOfPartitions(framesLimit, inputSize, factor), hashtableSize, inRecDesc,
+                        recordDescriptors[0], comparators, aggregateTpc, mergeTpc, aggregatorFactory, mergerFactory,
+                        writer);
+                numOfPartitions = hybridHashGrouper.getNumOfPartitions();
+            }
+
+            private int getNumberOfPartitions(int framesLimit, int inputSize, double factor) {
+                int numberOfPartitions = (int) (Math.ceil((double) (inputSize * factor / nPartitions - framesLimit)
+                        / (double) (framesLimit - 1)));
+                if (numberOfPartitions <= 0) {
+                    numberOfPartitions = 1;
+                }
+                return numberOfPartitions;
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                hybridHashGrouper.nextFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                hybridHashGrouper.close();
+                // process run files
+                for (int i = hybridHashGrouper.partitionStatus.nextSetBit(0); i >= 0; i = hybridHashGrouper.partitionStatus
+                        .nextSetBit(i + 1)) {
+                    if (hybridHashGrouper.getSpilledRunWriter(i) == null) {
+                        continue;
+                    }
+                    RunFileReader runReader = hybridHashGrouper.getSpilledRunWriter(i).createReader();
+                    int runFileSizeInFrame = hybridHashGrouper.getPartitionSizeInFrame(i);
+                    processRunFile(runReader, runFileSizeInFrame, hashtableSize / numOfPartitions, 0);
+                }
+                writer.close();
+            }
+
+            private void processRunFile(RunFileReader runReader, int runFileSize, int htSize, int level)
+                    throws HyracksDataException {
+                LOGGER.warning("processRunFile " + runFileSize + " " + htSize);
+                int partitionCount = getNumberOfPartitions(framesLimit, runFileSize, factor);
+                ITuplePartitionComputer partitionComputer = new FieldHashPartitionComputerFamily(storedKeys,
+                        hashFunctionFamilies).createPartitioner(level + 1);
+                AbstractImprovedHybridHashGrouper grouper = hybridHashGrouperFactory.createHybridHashGrouper(ctx,
+                        framesLimit, storedKeys, storedKeys, partitionCount, hashtableSize, internalRecordDescriptor,
+                        recordDescriptors[0], comparators, partitionComputer, partitionComputer, mergerFactory,
+                        mergerFactory, writer);
+
+                runReader.open();
+
+                if (runLoadFrame == null) {
+                    runLoadFrame = ctx.allocateFrame();
+                }
+
+                while (runReader.nextFrame(runLoadFrame)) {
+                    grouper.nextFrame(runLoadFrame);
+                }
+
+                runReader.close();
+                grouper.close();
+
+                if (grouper.hasSpilledPartition()) {
+                    if (grouper.getPartitionMaxFrameSize() < HYBRID_SWITCH_THRESHOLD * runFileSize) {
+                        // recursive hybrid-hash-group
+                        for (int i = grouper.partitionStatus.nextSetBit(0); i >= 0; i = grouper.partitionStatus
+                                .nextSetBit(i + 1)) {
+                            RunFileReader runFileReader = grouper.getSpilledRunWriter(i).createReader();
+                            int runFileSizeInFrame = grouper.getPartitionSizeInFrame(i);
+                            processRunFile(runFileReader, runFileSizeInFrame, htSize / partitionCount, level + 1);
+                        }
+                    } else {
+                        // switch to external-hash-group
+                        int tableSize = 0;
+
+                        for (int i = grouper.partitionStatus.nextSetBit(0); i >= 0; i = grouper.partitionStatus
+                                .nextSetBit(i + 1)) {
+                            tableSize += grouper.partitionSizeInTuple[i] * 2;
+                        }
+
+                        // use external-hash-group
+                        ExternalHashGrouperAggregate aggregator = new ExternalHashGrouperAggregate(ctx, storedKeys,
+                                framesLimit, tableSize, comparatorFactories, null, mergerFactory,
+                                internalRecordDescriptor, recordDescriptors[0], new HashSpillableFrameSortTableFactory(
+                                        new ITuplePartitionComputerFactory() {
+
+                                            private static final long serialVersionUID = 1L;
+
+                                            @Override
+                                            public ITuplePartitionComputer createPartitioner() {
+                                                return new FieldHashPartitionComputerFamily(storedKeys,
+                                                        hashFunctionFamilies).createPartitioner(0);
+                                            }
+                                        }), writer, false);
+                        aggregator.initGrouperForAggregate();
+
+                        for (int i = grouper.partitionStatus.nextSetBit(0); i >= 0; i = grouper.partitionStatus
+                                .nextSetBit(i + 1)) {
+                            RunFileReader rReader = grouper.getSpilledRunWriter(i).createReader();
+                            rReader.open();
+                            while (rReader.nextFrame(runLoadFrame)) {
+                                aggregator.insertFrame(runLoadFrame);
+                            }
+                            rReader.close();
+                        }
+                        aggregator.finishAggregation();
+                        LinkedList<RunFileReader> runFiles = aggregator.getRunReaders();
+                        if (!runFiles.isEmpty()) {
+                            ExternalHashGrouperMerge merger = new ExternalHashGrouperMerge(ctx, storedKeys,
+                                    framesLimit, comparatorFactories, mergerFactory, internalRecordDescriptor,
+                                    recordDescriptors[0], false, writer);
+                            merger.initialize(aggregator.getGroupTable(), runFiles);
+                        }
+                    }
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SerializableHashTableWithFrameAllocator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SerializableHashTableWithFrameAllocator.java
new file mode 100644
index 0000000..b9ff385
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SerializableHashTableWithFrameAllocator.java
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class SerializableHashTableWithFrameAllocator {
+
+    private static final int INT_SIZE = 4;
+    private static final int INIT_ENTRY_SIZE = 4;
+
+    private IntSerDeBuffer[] headers;
+    private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>();
+
+    private List<Integer> frameCurrentIndex = new ArrayList<Integer>();
+    private final IFrameAllocator frameAllocator;
+
+    private int frameCapacity = 0;
+    private int currentLargestFrameIndex = 0;
+    private int tupleCount = 0;
+    private int headerFrameCount = 0;
+    private TuplePointer tempTuplePointer = new TuplePointer();
+
+    private int beginFrameIndex, lastFrameIndex;
+
+    public SerializableHashTableWithFrameAllocator(int tableSize, int frameSize, IFrameAllocator frameAllocator)
+            throws HyracksDataException {
+        this.frameAllocator = frameAllocator;
+
+        int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
+        int headerSize = tableSize / frameSize * INT_SIZE * 2 + residual;
+        headers = new IntSerDeBuffer[headerSize];
+
+        this.beginFrameIndex = frameAllocator.allocateFrame();
+        this.lastFrameIndex = beginFrameIndex;
+        byte[] headerFrame = frameAllocator.getFrame(beginFrameIndex);
+        if (headerFrame == null) {
+            throw new HyracksDataException("Cannot initialize the hashtable: not enough memory");
+        }
+
+        IntSerDeBuffer frame = new IntSerDeBuffer(headerFrame);
+        resetFrame(frame);
+        contents.add(frame);
+        frameCurrentIndex.add(0);
+        frameCapacity = frame.capacity();
+    }
+
+    public boolean insert(int entry, TuplePointer pointer) throws HyracksDataException {
+        int hFrameIndex = getHeaderFrameIndex(entry);
+        int headerOffset = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[hFrameIndex];
+        if (header == null) {
+            int newFrameIndex = frameAllocator.allocateFrameForOverflowing(lastFrameIndex);
+            if (newFrameIndex < 0) {
+                return false;
+            }
+            this.lastFrameIndex = newFrameIndex;
+            byte[] newFrame = frameAllocator.getFrame(lastFrameIndex);
+            if (newFrame == null) {
+                return false;
+            }
+            header = new IntSerDeBuffer(newFrame);
+            headers[hFrameIndex] = header;
+            resetFrame(header);
+            headerFrameCount++;
+        }
+        int frameIndex = header.getInt(headerOffset);
+        int offsetIndex = header.getInt(headerOffset + 1);
+        boolean isInserted;
+        if (frameIndex < 0) {
+            // insert first tuple into the entry
+            isInserted = insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
+        } else {
+            // insert non-first tuple into the entry
+            isInserted = insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer);
+        }
+        if (isInserted) {
+            tupleCount++;
+        }
+        return isInserted;
+    }
+
+    public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
+        int hFrameIndex = getHeaderFrameIndex(entry);
+        int headerOffset = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[hFrameIndex];
+        if (header == null) {
+            dataPointer.frameIndex = -1;
+            dataPointer.tupleIndex = -1;
+            return;
+        }
+        int frameIndex = header.getInt(headerOffset);
+        int offsetIndex = header.getInt(headerOffset + 1);
+        if (frameIndex < 0) {
+            dataPointer.frameIndex = -1;
+            dataPointer.tupleIndex = -1;
+            return;
+        }
+        IntSerDeBuffer frame = contents.get(frameIndex);
+        int entryUsedItems = frame.getInt(offsetIndex + 1);
+        if (offset > entryUsedItems - 1) {
+            dataPointer.frameIndex = -1;
+            dataPointer.tupleIndex = -1;
+            return;
+        }
+        int startIndex = offsetIndex + 2 + offset * 2;
+        while (startIndex >= frameCapacity) {
+            ++frameIndex;
+            startIndex -= frameCapacity;
+        }
+        frame = contents.get(frameIndex);
+        dataPointer.frameIndex = frame.getInt(startIndex);
+        dataPointer.tupleIndex = frame.getInt(startIndex + 1);
+    }
+
+    public void reset() {
+        for (IntSerDeBuffer frame : headers)
+            if (frame != null)
+                resetFrame(frame);
+
+        frameCurrentIndex.clear();
+        for (int i = 0; i < contents.size(); i++) {
+            frameCurrentIndex.add(0);
+        }
+
+        currentLargestFrameIndex = 0;
+        tupleCount = 0;
+    }
+
+    public int getFrameCount() {
+        return headerFrameCount + contents.size();
+    }
+
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    public void close() {
+        headers = null;
+        contents.clear();
+        frameCurrentIndex.clear();
+        tupleCount = 0;
+        currentLargestFrameIndex = 0;
+        lastFrameIndex = -1;
+        beginFrameIndex = -1;
+    }
+
+    private boolean insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)
+            throws HyracksDataException {
+        IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
+        int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
+        int requiredIntCapacity = entryCapacity * 2;
+        int startFrameIndex = currentLargestFrameIndex;
+
+        if (lastIndex + requiredIntCapacity >= frameCapacity) {
+            IntSerDeBuffer newFrame;
+            startFrameIndex++;
+            do {
+                if (currentLargestFrameIndex >= contents.size() - 1) {
+                    int newFrameIndex = frameAllocator.allocateFrameForOverflowing(lastFrameIndex);
+                    this.lastFrameIndex = newFrameIndex;
+                    byte[] allocatedFrame = frameAllocator.getFrame(lastFrameIndex);
+                    if (allocatedFrame == null) {
+                        return false;
+                    }
+                    newFrame = new IntSerDeBuffer(allocatedFrame);
+                    currentLargestFrameIndex++;
+                    contents.add(newFrame);
+                    frameCurrentIndex.add(0);
+                } else {
+                    currentLargestFrameIndex++;
+                    frameCurrentIndex.set(currentLargestFrameIndex, 0);
+                }
+                requiredIntCapacity -= frameCapacity;
+            } while (requiredIntCapacity > 0);
+            lastIndex = 0;
+            lastFrame = contents.get(startFrameIndex);
+        }
+
+        // set header
+        header.writeInt(headerOffset, startFrameIndex);
+        header.writeInt(headerOffset + 1, lastIndex);
+
+        // set the entry
+        lastFrame.writeInt(lastIndex, entryCapacity - 1);
+        lastFrame.writeInt(lastIndex + 1, 1);
+        lastFrame.writeInt(lastIndex + 2, pointer.frameIndex);
+        lastFrame.writeInt(lastIndex + 3, pointer.tupleIndex);
+        int newLastIndex = lastIndex + entryCapacity * 2;
+        newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1;
+        frameCurrentIndex.set(startFrameIndex, newLastIndex);
+
+        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
+        while (requiredIntCapacity > 0) {
+            startFrameIndex++;
+            requiredIntCapacity -= frameCapacity;
+            newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1;
+            frameCurrentIndex.set(startFrameIndex, newLastIndex);
+        }
+        return true;
+    }
+
+    private boolean insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndex, int offsetIndex,
+            TuplePointer pointer) throws HyracksDataException {
+        IntSerDeBuffer frame = contents.get(frameIndex);
+        int entryItems = frame.getInt(offsetIndex);
+        int entryUsedItems = frame.getInt(offsetIndex + 1);
+
+        if (entryUsedItems < entryItems) {
+            frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
+            int startIndex = offsetIndex + 2 + entryUsedItems * 2;
+            while (startIndex >= frameCapacity) {
+                ++frameIndex;
+                startIndex -= frameCapacity;
+            }
+            frame = contents.get(frameIndex);
+            frame.writeInt(startIndex, pointer.frameIndex);
+            frame.writeInt(startIndex + 1, pointer.tupleIndex);
+        } else {
+            int capacity = (entryItems + 1) * 2;
+            int headFrameVal = header.getInt(headerOffset);
+            int headOffsetVal = header.getInt(headerOffset + 1);
+            header.writeInt(headerOffset, -1);
+            header.writeInt(headerOffset + 1, -1);
+            int fIndex = frame.getInt(offsetIndex + 2);
+            int tIndex = frame.getInt(offsetIndex + 3);
+            tempTuplePointer.frameIndex = fIndex;
+            tempTuplePointer.tupleIndex = tIndex;
+            if (!insertNewEntry(header, headerOffset, capacity, tempTuplePointer)) {
+                header.writeInt(headerOffset, headFrameVal);
+                header.writeInt(headerOffset + 1, headOffsetVal);
+                return false;
+            }
+
+            int newFrameIndex = header.getInt(headerOffset);
+            int newTupleIndex = header.getInt(headerOffset + 1);
+
+            for (int i = 1; i < entryUsedItems; i++) {
+                int startIndex = offsetIndex + 2 + i * 2;
+                int startFrameIndex = frameIndex;
+                while (startIndex >= frameCapacity) {
+                    ++startFrameIndex;
+                    startIndex -= frameCapacity;
+                }
+                frame = contents.get(startFrameIndex);
+                fIndex = frame.getInt(startIndex);
+                tIndex = frame.getInt(startIndex + 1);
+                tempTuplePointer.frameIndex = fIndex;
+                tempTuplePointer.tupleIndex = tIndex;
+                if (!insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer)) {
+                    throw new HyracksDataException("Failed to allocate space to expand the entry size.");
+                }
+            }
+            if (!insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer)) {
+                throw new HyracksDataException("Failed to insert the tuple after the entry has been expanded.");
+            }
+        }
+        return true;
+    }
+
+    private void resetFrame(IntSerDeBuffer frame) {
+        for (int i = 0; i < frameCapacity; i++)
+            frame.writeInt(i, -1);
+    }
+
+    private int getHeaderFrameIndex(int entry) {
+        int frameIndex = entry * 2 / frameCapacity;
+        return frameIndex;
+    }
+
+    private int getHeaderFrameOffset(int entry) {
+        int offset = entry * 2 % frameCapacity;
+        return offset;
+    }
+
+    public int getLastFrame() {
+        return lastFrameIndex;
+    }
+
+    public static int getInitialFrameCount() {
+        return 1;
+    }
+
+    public static int getMaxHeaderFrameCount(int hashtableSize, int frameSize) {
+        return (int) (Math.ceil(hashtableSize * 2 / (frameSize / 4)));
+    }
+
+    private class IntSerDeBuffer {
+
+        private byte[] bytes;
+
+        public IntSerDeBuffer(byte[] data) {
+            this.bytes = data;
+        }
+
+        public int getInt(int pos) {
+            int offset = pos * 4;
+            return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
+                    + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
+        }
+
+        public void writeInt(int pos, int value) {
+            int offset = pos * 4;
+            bytes[offset++] = (byte) (value >> 24);
+            bytes[offset++] = (byte) (value >> 16);
+            bytes[offset++] = (byte) (value >> 8);
+            bytes[offset++] = (byte) (value);
+        }
+
+        public int capacity() {
+            return bytes.length / 4;
+        }
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/TupleInFrameAccessor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/TupleInFrameAccessor.java
new file mode 100644
index 0000000..4826c47
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/TupleInFrameAccessor.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class TupleInFrameAccessor {
+
+    private final RecordDescriptor recordDescriptor;
+
+    private ByteBuffer buffer;
+
+    private int tupleOffset;
+
+    public TupleInFrameAccessor(RecordDescriptor recordDescriptor) {
+        this.recordDescriptor = recordDescriptor;
+    }
+
+    public void reset(ByteBuffer buffer, int tupleOffset) {
+        this.buffer = buffer;
+        this.tupleOffset = tupleOffset;
+    }
+
+    public int getTupleStartOffset() {
+        return tupleOffset;
+    }
+
+    public int getTupleEndOffset() {
+        return tupleOffset + getFieldCount() * 4 + buffer.getInt(tupleOffset + (getFieldCount() - 1) * 4);
+    }
+    
+    public int getFieldStartOffset(int fIdx) {
+        return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset() + (fIdx - 1) * 4);
+    }
+
+    public int getFieldEndOffset(int fIdx) {
+        return buffer.getInt(getTupleStartOffset() + fIdx * 4);
+    }
+    
+    public int getFieldLength(int fIdx) {
+        return getFieldEndOffset(fIdx) - getFieldStartOffset(fIdx);
+    }
+    
+    public int getFieldSlotsLength() {
+        return getFieldCount() * 4;
+    }
+
+    public int getFieldCount() {
+        return recordDescriptor.getFieldCount();
+    }
+    
+    public ByteBuffer getBuffer(){
+        return buffer;
+    }
+    
+    public int getTupleLength(){
+        return getTupleEndOffset() - tupleOffset;
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index 2e781b5..760ca5b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -167,6 +167,14 @@
                 return new AggregateState(new Integer[] { 0, 0 });
             }
 
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+                if (useObjectState) {
+                    return 0;
+                } else {
+                    return 8;
+                }
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index cc5c1e1..53e62ee 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -167,6 +167,14 @@
                     state.state = new Integer[] { sum, count };
                 }
             }
+
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+                if (useObjectState) {
+                    return 0;
+                } else {
+                    return 8;
+                }
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index 9bfec8e..12f4325 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -135,6 +135,15 @@
                     state.state = count;
                 }
             }
+
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+                if (useObjectState) {
+                    return 0;
+                } else {
+                    return 4;
+                }
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
index f8b1d74..851d874 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
@@ -30,37 +30,35 @@
 /**
  *
  */
-public class FloatSumFieldAggregatorFactory implements
-        IFieldAggregateDescriptorFactory {
+public class FloatSumFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
 
     private final int aggField;
 
     private final boolean useObjectState;
-    
-    public FloatSumFieldAggregatorFactory(int aggField, boolean useObjState){
+
+    public FloatSumFieldAggregatorFactory(int aggField, boolean useObjState) {
         this.aggField = aggField;
         this.useObjectState = useObjState;
     }
-    
+
     /* (non-Javadoc)
      * @see edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
     @Override
-    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
         return new IFieldAggregateDescriptor() {
-            
+
             @Override
             public void reset() {
-                
+
             }
-            
+
             @Override
-            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
                 float sum;
                 if (!useObjectState) {
                     sum = FloatSerializerDeserializer.getFloat(data, offset);
@@ -73,10 +71,10 @@
                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
-            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
                 float sum;
                 if (!useObjectState) {
                     sum = FloatSerializerDeserializer.getFloat(data, offset);
@@ -89,20 +87,19 @@
                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
             public boolean needsObjectState() {
                 return useObjectState;
             }
-            
+
             @Override
             public boolean needsBinaryState() {
                 return !useObjectState;
             }
-            
+
             @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, AggregateState state)
+            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 float sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -121,22 +118,21 @@
                     state.state = sum;
                 }
             }
-            
+
             @Override
             public AggregateState createState() {
                 return new AggregateState(new Float(0.0));
             }
-            
+
             @Override
             public void close() {
                 // TODO Auto-generated method stub
-                
+
             }
-            
+
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, AggregateState state)
-                    throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
                 float sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
@@ -152,6 +148,15 @@
                     state.state = sum;
                 }
             }
+
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+                if (useObjectState) {
+                    return 0;
+                } else {
+                    return 4;
+                }
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 7d85deb..d5f8637 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -151,6 +151,15 @@
                     state.state = sum;
                 }
             }
+
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+                if (useObjectState) {
+                    return 0;
+                } else {
+                    return 4;
+                }
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 94ebcbd..b8df344 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -194,6 +194,15 @@
                 return new AggregateState();
             }
 
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+                if (hasBinaryState) {
+                    return 4;
+                } else {
+                    return 0;
+                }
+            }
+
         };
     }
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 394a5e3..8d06708 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
 
 /**
  *
@@ -203,6 +204,43 @@
                     }
                 }
             }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+                    AggregateState state) throws HyracksDataException {
+                if (stateAccessor != null) {
+                    int stateTupleOffset = stateAccessor.getTupleStartOffset();
+                    int fieldIndex = 0;
+                    for (int i = 0; i < aggregators.length; i++) {
+                        if (aggregators[i].needsBinaryState()) {
+                            int stateFieldOffset = stateAccessor.getFieldStartOffset(keys.length + fieldIndex);
+                            aggregators[i].aggregate(accessor, tIndex, stateAccessor.getBuffer().array(),
+                                    stateTupleOffset + stateAccessor.getFieldSlotsLength() + stateFieldOffset,
+                                    ((AggregateState[]) state.state)[i]);
+                            fieldIndex++;
+                        } else {
+                            aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < aggregators.length; i++) {
+                        aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
+                    }
+                }
+            }
+
+            @Override
+            public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int initLength = 0;
+                for (int i = 0; i < aggregators.length; i++)
+                    initLength += aggregators[i].getInitSize(accessor, tIndex);
+                return initLength;
+            }
+
+            @Override
+            public int getFieldCount() {
+                return aggregators.length;
+            }
         };
     }
 }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ImprovedHybridHashGroupTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ImprovedHybridHashGroupTest.java
new file mode 100644
index 0000000..544d8a7
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ImprovedHybridHashGroupTest.java
@@ -0,0 +1,424 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.ImprovedHybridHashGroupFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.ImprovedHybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+
+public class ImprovedHybridHashGroupTest extends AbstractIntegrationTest {
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
+            new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+
+    final int inputSize = 2300;
+
+    final int frameLimits = 6;
+
+    final double factor = 1.2;
+
+    final int hashtableSize = 12000;
+
+    final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+            UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, }, '|');
+
+    private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
+            throws IOException {
+
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
+                new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit(NC1_ID, createTempFile().getAbsolutePath()),
+                        new FileSplit(NC2_ID, createTempFile().getAbsolutePath()) }), "\t");
+
+        return printer;
+    }
+
+    @Test
+    public void singleKeySumGroupOneNodeTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+
+        ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+                frameLimits, inputSize, factor, hashtableSize, keyFields,
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false),
+                                new FloatSumFieldAggregatorFactory(5, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new FloatSumFieldAggregatorFactory(3, false) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID);
+
+        IConnectorDescriptor conn1 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumGroupOneNodeTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void singleKeySumGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+
+        ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+                frameLimits, inputSize, factor, hashtableSize, keyFields,
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false),
+                                new FloatSumFieldAggregatorFactory(5, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new FloatSumFieldAggregatorFactory(3, false) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void singleKeyAvgExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+
+        ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+                frameLimits, inputSize, factor, hashtableSize, keyFields,
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new ImprovedHybridHashGroupFactory(),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new AvgFieldMergeAggregatorFactory(3, false) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void singleKeyMinMaxStringExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+
+        ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+                frameLimits, inputSize, factor, hashtableSize, keyFields,
+                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void multiKeySumExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 8, 0 };
+
+        ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+                frameLimits * 2, 3 * inputSize, factor, hashtableSize * 3, keyFields, new IBinaryHashFunctionFamily[] {
+                        UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void multiKeyAvgExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 8, 0 };
+
+        ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+                frameLimits * 2, 3 * inputSize, factor, 3 * hashtableSize, keyFields, new IBinaryHashFunctionFamily[] {
+                        UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new ImprovedHybridHashGroupFactory(),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+                                new IntSumFieldAggregatorFactory(3, false),
+                                new AvgFieldMergeAggregatorFactory(4, false) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void multiKeyMinMaxStringExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 8, 0 };
+
+        ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+                frameLimits * 2, 3 * inputSize, factor, 3 * hashtableSize, keyFields, new IBinaryHashFunctionFamily[] {
+                        UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(2, false),
+                        new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+}