Merge HybridHash from Jarod branch

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2689 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
new file mode 100644
index 0000000..d63609e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
@@ -0,0 +1,377 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class GroupRunMergingFrameReader implements IFrameReader {
+
+    private static final int INT_SIZE = 4;
+
+    private final IHyracksTaskContext ctx;
+    private final IFrameReader[] runCursors;
+    private final List<ByteBuffer> inFrames;
+    private final int[] keyFields;
+    private final int framesLimit;
+    private final int tableSize;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDesc;
+    private final FrameTupleAppender outFrameAppender;
+    private final ITuplePartitionComputer tpc;
+    private ReferencedPriorityQueue topTuples;
+    private int[] tupleIndexes;
+    private int[] currentFrameIndexForRuns, bufferedFramesForRuns;
+    private FrameTupleAccessor[] tupleAccessors;
+    private int framesBuffered;
+
+    private final IAggregatorDescriptor grouper;
+    private final AggregateState groupState;
+
+    private final boolean isLoadBuffered;
+
+    private final boolean isFinalPhase;
+
+    private final ArrayTupleBuilder groupTupleBuilder, outputTupleBuilder;
+
+    private byte[] groupResultCache;
+    private ByteBuffer groupResultCacheBuffer;
+    private IFrameTupleAccessor groupResultCacheAccessor;
+    private FrameTupleAppender groupResultCacheAppender;
+
+    // FIXME
+    long queueCompCounter = 0, mergeCompCounter = 0;
+
+    public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
+            int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
+            IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase) {
+        this(ctx, runCursors, framesLimit, tableSize, keyFields, tpc, comparators, grouper, recordDesc, isFinalPhase,
+                false);
+    }
+
+    public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
+            int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
+            IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase, boolean isLoadBuffered) {
+        this.ctx = ctx;
+        this.runCursors = runCursors;
+        this.inFrames = new ArrayList<ByteBuffer>();
+        this.keyFields = keyFields;
+        this.tableSize = tableSize;
+        this.comparators = comparators;
+        this.recordDesc = recordDesc;
+        this.grouper = grouper;
+        this.groupState = grouper.createAggregateStates();
+        this.outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+        this.isLoadBuffered = isLoadBuffered;
+        this.isFinalPhase = isFinalPhase;
+        this.framesLimit = framesLimit;
+        this.tpc = tpc;
+
+        this.groupTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
+        this.outputTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameReader#open()
+     */
+    @Override
+    public void open() throws HyracksDataException {
+        if (isLoadBuffered) {
+            while (inFrames.size() + 1 < framesLimit) {
+                inFrames.add(ctx.allocateFrame());
+            }
+            framesBuffered = inFrames.size() / runCursors.length;
+        } else {
+            while (inFrames.size() < framesLimit - 1 && inFrames.size() < runCursors.length) {
+                inFrames.add(ctx.allocateFrame());
+            }
+            framesBuffered = 1;
+        }
+        tupleAccessors = new FrameTupleAccessor[runCursors.length];
+        currentFrameIndexForRuns = new int[runCursors.length];
+        bufferedFramesForRuns = new int[runCursors.length];
+        Comparator<ReferenceEntryWithBucketID> comparator = createEntryComparator(comparators);
+        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
+        tupleIndexes = new int[runCursors.length];
+
+        for (int i = 0; i < runCursors.length; i++) {
+            int runIndex = topTuples.peek().getRunid();
+            tupleIndexes[runIndex] = 0;
+            runCursors[runIndex].open();
+            for (int j = 0; j < framesBuffered; j++) {
+
+                if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
+
+                    bufferedFramesForRuns[runIndex]++;
+                    if (j == 0) {
+                        tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+                        tupleAccessors[runIndex].reset(inFrames.get(runIndex * framesBuffered + j));
+                        setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+                        currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
+                    }
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameReader#nextFrame(java.nio.ByteBuffer)
+     */
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        outFrameAppender.reset(buffer, true);
+
+        while (!topTuples.areRunsExhausted()) {
+            ReferenceEntryWithBucketID top = topTuples.peek();
+            int runIndex = top.getRunid();
+            FrameTupleAccessor fta = top.getAccessor();
+            int tupleIndex = top.getTupleIndex();
+
+            // check whether we can do aggregation
+            boolean needInsert = true;
+            if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
+                groupResultCacheAccessor.reset(ByteBuffer.wrap(groupResultCache));
+                if (compareFrameTuples(fta, tupleIndex, groupResultCacheAccessor, 0) == 0) {
+                    needInsert = false;
+                }
+            }
+
+            if (needInsert) {
+
+                // try to flush the group cache into the output buffer, if any
+                if (groupResultCacheAccessor != null && groupResultCacheAccessor.getFieldCount() > 0) {
+                    outputTupleBuilder.reset();
+                    for (int k = 0; k < keyFields.length; k++) {
+                        outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
+                    }
+                    if (isFinalPhase) {
+                        grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+                    } else {
+                        grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+                    }
+
+                    // return if the buffer is full
+                    if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                        return true;
+                    }
+                    groupResultCacheBuffer.putInt(groupResultCache.length - 4, 0);
+                }
+
+                groupTupleBuilder.reset();
+                for (int k : keyFields) {
+                    groupTupleBuilder.addField(fta, tupleIndex, k);
+                }
+                grouper.init(groupTupleBuilder, fta, tupleIndex, groupState);
+
+                // enlarge the cache buffer if necessary
+                int requiredSize = groupTupleBuilder.getSize() + groupTupleBuilder.getFieldEndOffsets().length
+                        * INT_SIZE + 2 * INT_SIZE;
+
+                if (groupResultCache == null || groupResultCache.length < requiredSize) {
+                    groupResultCache = new byte[requiredSize];
+                    groupResultCacheAppender = new FrameTupleAppender(groupResultCache.length);
+                    groupResultCacheBuffer = ByteBuffer.wrap(groupResultCache);
+                    groupResultCacheAccessor = new FrameTupleAccessor(groupResultCache.length, recordDesc);
+                }
+
+                // always reset the group cache
+                groupResultCacheAppender.reset(groupResultCacheBuffer, true);
+                if (!groupResultCacheAppender.append(groupTupleBuilder.getFieldEndOffsets(),
+                        groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {
+                    throw new HyracksDataException("The partial result is too large to be initialized in a frame.");
+                }
+
+                groupResultCacheAccessor.reset(groupResultCacheBuffer);
+
+            } else {
+                grouper.aggregate(fta, tupleIndex, groupResultCacheAccessor, 0, groupState);
+            }
+
+            ++tupleIndexes[runIndex];
+            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+        }
+
+        if (groupResultCacheAccessor != null && groupResultCacheAccessor.getTupleCount() > 0) {
+            outputTupleBuilder.reset();
+            for (int k = 0; k < keyFields.length; k++) {
+                outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
+            }
+            if (isFinalPhase) {
+                grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+            } else {
+                grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+            }
+
+            // return if the buffer is full
+            if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
+                    outputTupleBuilder.getSize())) {
+                return true;
+            }
+
+            groupResultCacheAccessor = null;
+            groupResultCache = null;
+            groupResultCacheBuffer = null;
+            groupResultCacheAppender = null;
+        }
+
+        if (outFrameAppender.getTupleCount() > 0) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameReader#close()
+     */
+    @Override
+    public void close() throws HyracksDataException {
+        for (int i = 0; i < runCursors.length; ++i) {
+            closeRun(i, runCursors, tupleAccessors);
+        }
+    }
+
+    private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+        if (exists) {
+            int h = tpc.partition(tupleAccessors[runIndex], tupleIndexes[runIndex], tableSize);
+            topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex], h);
+        } else {
+            topTuples.pop();
+            closeRun(runIndex, runCursors, tupleAccessors);
+        }
+    }
+
+    private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
+        if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+            return false;
+        } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+            if (currentFrameIndexForRuns[runIndex] - runIndex * framesBuffered < bufferedFramesForRuns[runIndex] - 1) {
+                currentFrameIndexForRuns[runIndex]++;
+            } else {
+                bufferedFramesForRuns[runIndex] = 0;
+                for (int j = 0; j < framesBuffered; j++) {
+                    if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
+                        bufferedFramesForRuns[runIndex]++;
+                    } else {
+                        break;
+                    }
+                }
+                currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
+            }
+            if (bufferedFramesForRuns[runIndex] > 0) {
+                tupleAccessors[runIndex].reset(inFrames.get(currentFrameIndexForRuns[runIndex]));
+                tupleIndexes[runIndex] = 0;
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            return true;
+        }
+    }
+
+    private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
+            throws HyracksDataException {
+        if (runCursors[index] != null) {
+            runCursors[index].close();
+            runCursors[index] = null;
+            tupleAccessors[index] = null;
+        }
+    }
+
+    private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+        mergeCompCounter++;
+        byte[] b1 = fta1.getBuffer().array();
+        byte[] b2 = fta2.getBuffer().array();
+        for (int f = 0; f < keyFields.length; ++f) {
+            int fIdx = f;
+            int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+            int l1 = fta1.getFieldLength(j1, fIdx);
+            int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+            int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+            int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+            int l2 = l2_end - l2_start;
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    private Comparator<ReferenceEntryWithBucketID> createEntryComparator(final IBinaryComparator[] comparators) {
+        return new Comparator<ReferenceEntryWithBucketID>() {
+            public int compare(ReferenceEntryWithBucketID tp1, ReferenceEntryWithBucketID tp2) {
+
+                queueCompCounter++;
+
+                int cmp = tp1.getBucketID() - tp2.getBucketID();
+
+                if (cmp != 0) {
+                    return cmp;
+                }
+
+                FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+                FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+                int j1 = tp1.getTupleIndex();
+                int j2 = tp2.getTupleIndex();
+                byte[] b1 = fta1.getBuffer().array();
+                byte[] b2 = fta2.getBuffer().array();
+                for (int f = 0; f < keyFields.length; ++f) {
+                    int fIdx = keyFields[f];
+                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                            + fta1.getFieldStartOffset(j1, fIdx);
+                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                            + fta2.getFieldStartOffset(j2, fIdx);
+                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                    if (c != 0) {
+                        return c;
+                    }
+                }
+
+                return cmp;
+            }
+        };
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
new file mode 100644
index 0000000..57a364b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
@@ -0,0 +1,687 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAccessorForGroupHashtable;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAppenderForGroupHashtable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HybridHashSortGroupHashTable {
+
+    protected static final int INT_SIZE = 4;
+    protected static final int INIT_REF_COUNT = 8;
+    protected static final int PTR_SIZE = 3;
+
+    protected final int tableSize, framesLimit, frameSize;
+
+    protected final ByteBuffer[] headers;
+    protected final ByteBuffer[] contents;
+
+    protected final IHyracksTaskContext ctx;
+
+    protected int currentLargestFrameIndex;
+    protected int totalTupleCount;
+
+    protected final IAggregatorDescriptor aggregator;
+    protected final AggregateState aggState;
+
+    protected final int[] keys, internalKeys;
+
+    private final IBinaryComparator[] comparators;
+
+    protected final ITuplePartitionComputer tpc;
+
+    protected final INormalizedKeyComputer firstNormalizer;
+
+    private ByteBuffer outputBuffer;
+
+    private LinkedList<RunFileReader> runReaders;
+
+    protected TuplePointer matchPointer;
+
+    protected final FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
+
+    private final FrameTupleAccessorForGroupHashtable compFrameAccessor1, compFrameAccessor2;
+
+    protected final FrameTupleAppenderForGroupHashtable internalAppender;
+
+    private final FrameTupleAppender outputAppender;
+
+    /**
+     * Tuple builder for hash table insertion
+     */
+    protected final ArrayTupleBuilder internalTupleBuilder, outputTupleBuilder;
+
+    /**
+     * pointers for sort records in an entry
+     */
+    protected int[] tPointers;
+
+    protected int usedEntries = 0;
+
+    protected long hashedKeys = 0, hashedRawRec = 0;
+
+    public HybridHashSortGroupHashTable(IHyracksTaskContext ctx, int frameLimits, int tableSize, int[] keys,
+            IBinaryComparator[] comparators, ITuplePartitionComputer tpc,
+            INormalizedKeyComputer firstNormalizerComputer, IAggregatorDescriptor aggregator,
+            RecordDescriptor inRecDesc, RecordDescriptor outRecDesc) {
+        this.ctx = ctx;
+        this.tableSize = tableSize;
+        this.framesLimit = frameLimits;
+        this.frameSize = ctx.getFrameSize();
+
+        this.keys = keys;
+        this.internalKeys = new int[keys.length];
+        for (int i = 0; i < internalKeys.length; i++) {
+            internalKeys[i] = i;
+        }
+
+        this.aggregator = aggregator;
+        this.aggState = aggregator.createAggregateStates();
+
+        this.tpc = tpc;
+        this.comparators = comparators;
+        this.firstNormalizer = firstNormalizerComputer;
+
+        // initialize the hash table
+        int residual = ((tableSize % frameSize) * INT_SIZE * 2) % frameSize == 0 ? 0 : 1;
+        this.headers = new ByteBuffer[tableSize / frameSize * INT_SIZE * 2 + tableSize % frameSize * 2 * INT_SIZE
+                / frameSize + residual];
+
+        this.outputBuffer = ctx.allocateFrame();
+
+        this.contents = new ByteBuffer[framesLimit - 1 - headers.length];
+        this.currentLargestFrameIndex = -1;
+        this.totalTupleCount = 0;
+
+        this.runReaders = new LinkedList<RunFileReader>();
+        this.hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
+        this.compFrameAccessor1 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
+        this.compFrameAccessor2 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
+
+        this.internalTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
+        this.outputTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
+        this.internalAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
+        this.outputAppender = new FrameTupleAppender(frameSize);
+
+        this.matchPointer = new TuplePointer();
+
+    }
+
+    /**
+     * Reset the header page
+     * 
+     * @param headerFrameIndex
+     */
+    protected void resetHeader(int headerFrameIndex) {
+        for (int i = 0; i < frameSize; i += INT_SIZE) {
+            headers[headerFrameIndex].putInt(i, -1);
+        }
+    }
+
+    /**
+     * Get the header frame index of the given hash table entry
+     * 
+     * @param entry
+     * @return
+     */
+    protected int getHeaderFrameIndex(int entry) {
+        int frameIndex = entry / frameSize * 2 * INT_SIZE + entry % frameSize * 2 * INT_SIZE / frameSize;
+        return frameIndex;
+    }
+
+    /**
+     * Get the tuple index of the given hash table entry
+     * 
+     * @param entry
+     * @return
+     */
+    protected int getHeaderTupleIndex(int entry) {
+        int offset = entry % frameSize * 2 * INT_SIZE % frameSize;
+        return offset;
+    }
+
+    public void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
+
+        int entry = tpc.partition(accessor, tupleIndex, tableSize);
+
+        hashedRawRec++;
+
+        if (findMatch(entry, accessor, tupleIndex)) {
+            // find match; do aggregation
+            hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+            aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex, aggState);
+        } else {
+
+            internalTupleBuilder.reset();
+            for (int k = 0; k < keys.length; k++) {
+                internalTupleBuilder.addField(accessor, tupleIndex, keys[k]);
+            }
+            aggregator.init(internalTupleBuilder, accessor, tupleIndex, aggState);
+            int insertFrameIndex = -1, insertTupleIndex = -1;
+            boolean inserted = false;
+
+            if (currentLargestFrameIndex < 0) {
+                currentLargestFrameIndex = 0;
+            }
+
+            if (contents[currentLargestFrameIndex] == null) {
+                contents[currentLargestFrameIndex] = ctx.allocateFrame();
+            }
+
+            internalAppender.reset(contents[currentLargestFrameIndex], false);
+            if (internalAppender.append(internalTupleBuilder.getFieldEndOffsets(), internalTupleBuilder.getByteArray(),
+                    0, internalTupleBuilder.getSize())) {
+                inserted = true;
+                insertFrameIndex = currentLargestFrameIndex;
+                insertTupleIndex = internalAppender.getTupleCount() - 1;
+            }
+
+            if (!inserted && currentLargestFrameIndex < contents.length - 1) {
+                currentLargestFrameIndex++;
+                if (contents[currentLargestFrameIndex] == null) {
+                    contents[currentLargestFrameIndex] = ctx.allocateFrame();
+                }
+                internalAppender.reset(contents[currentLargestFrameIndex], true);
+                if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                    throw new HyracksDataException("Failed to insert an aggregation value.");
+                } else {
+                    insertFrameIndex = currentLargestFrameIndex;
+                    insertTupleIndex = internalAppender.getTupleCount() - 1;
+                    inserted = true;
+                }
+            }
+
+            // memory is full
+            if (!inserted) {
+                // flush hash table and try to insert again
+                flush();
+
+                // update the match point to the header reference
+                matchPointer.frameIndex = -1;
+                matchPointer.tupleIndex = -1;
+                // re-insert
+                currentLargestFrameIndex++;
+                if (contents[currentLargestFrameIndex] == null) {
+                    contents[currentLargestFrameIndex] = ctx.allocateFrame();
+                }
+                internalAppender.reset(contents[currentLargestFrameIndex], true);
+                if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                    throw new HyracksDataException("Failed to insert an aggregation value.");
+                } else {
+                    insertFrameIndex = currentLargestFrameIndex;
+                    insertTupleIndex = internalAppender.getTupleCount() - 1;
+                }
+            }
+
+            // no match; new insertion
+            if (matchPointer.frameIndex < 0) {
+                // first record for this entry; update the header references
+                int headerFrameIndex = getHeaderFrameIndex(entry);
+                int headerFrameOffset = getHeaderTupleIndex(entry);
+                if (headers[headerFrameIndex] == null) {
+                    headers[headerFrameIndex] = ctx.allocateFrame();
+                    resetHeader(headerFrameIndex);
+                }
+                headers[headerFrameIndex].putInt(headerFrameOffset, insertFrameIndex);
+                headers[headerFrameIndex].putInt(headerFrameOffset + INT_SIZE, insertTupleIndex);
+                usedEntries++;
+
+            } else {
+                // update the previous reference
+                hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+                int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
+                contents[matchPointer.frameIndex].putInt(refOffset, insertFrameIndex);
+                contents[matchPointer.frameIndex].putInt(refOffset + INT_SIZE, insertTupleIndex);
+            }
+            hashedKeys++;
+            totalTupleCount++;
+        }
+    }
+
+    /**
+     * Flush the hash table directly to the output
+     */
+    public void flushHashtableToOutput(IFrameWriter outputWriter) throws HyracksDataException {
+
+        // FIXME: remove this 
+        outputAppender.reset(outputBuffer, true);
+        for (int i = 0; i < contents.length; i++) {
+            if (contents[i] == null) {
+                continue;
+            }
+            hashtableRecordAccessor.reset(contents[i]);
+            int tupleCount = hashtableRecordAccessor.getTupleCount();
+            for (int j = 0; j < tupleCount; j++) {
+                outputTupleBuilder.reset();
+
+                int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(j);
+                int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
+
+                for (int k = 0; k < internalKeys.length; k++) {
+                    outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
+                            + hashtableRecordAccessor.getFieldStartOffset(j, k),
+                            hashtableRecordAccessor.getFieldLength(j, k));
+                }
+
+                aggregator.outputFinalResult(outputTupleBuilder, hashtableRecordAccessor, j, aggState);
+
+                if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
+                        0, outputTupleBuilder.getSize())) {
+
+                    FrameUtils.flushFrame(outputBuffer, outputWriter);
+
+                    outputAppender.reset(outputBuffer, true);
+                    if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                        throw new HyracksDataException("Failed to flush the hash table to the final output");
+                    }
+                }
+            }
+        }
+
+        if (outputAppender.getTupleCount() > 0) {
+
+            FrameUtils.flushFrame(outputBuffer, outputWriter);
+
+            outputAppender.reset(outputBuffer, true);
+        }
+
+        totalTupleCount = 0;
+        usedEntries = 0;
+    }
+
+    /**
+     * Flush hash table into a run file.
+     * 
+     * @throws HyracksDataException
+     */
+    protected void flush() throws HyracksDataException {
+
+        long methodTimer = System.nanoTime();
+
+        FileReference runFile;
+        try {
+            runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+                    HybridHashSortGroupHashTable.class.getSimpleName());
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        RunFileWriter runWriter = new RunFileWriter(runFile, ctx.getIOManager());
+        runWriter.open();
+        flushEntries(runWriter);
+        runWriter.close();
+        runReaders.add(runWriter.createReader());
+        reset();
+
+        ctx.getCounterContext()
+                .getCounter("optional." + HybridHashSortGroupHashTable.class.getSimpleName() + ".flush.time", true)
+                .update(System.nanoTime() - methodTimer);
+    }
+
+    private void flushEntries(IFrameWriter writer) throws HyracksDataException {
+
+        outputAppender.reset(outputBuffer, true);
+        for (int i = 0; i < tableSize; i++) {
+            int tupleInEntry = sortEntry(i);
+
+            for (int ptr = 0; ptr < tupleInEntry; ptr++) {
+                int frameIndex = tPointers[ptr * PTR_SIZE];
+                int tupleIndex = tPointers[ptr * PTR_SIZE + 1];
+
+                hashtableRecordAccessor.reset(contents[frameIndex]);
+                outputTupleBuilder.reset();
+
+                int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(tupleIndex);
+                int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
+
+                for (int k = 0; k < internalKeys.length; k++) {
+                    outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
+                            + hashtableRecordAccessor.getFieldStartOffset(tupleIndex, k),
+                            hashtableRecordAccessor.getFieldLength(tupleIndex, k));
+                }
+
+                aggregator.outputPartialResult(outputTupleBuilder, hashtableRecordAccessor, tupleIndex, aggState);
+
+                if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
+                        0, outputTupleBuilder.getSize())) {
+
+                    FrameUtils.flushFrame(outputBuffer, writer);
+
+                    outputAppender.reset(outputBuffer, true);
+                    if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                        throw new HyracksDataException("Failed to flush an aggregation result.");
+                    }
+                }
+                totalTupleCount--;
+            }
+
+            if (tupleInEntry > 0) {
+                usedEntries--;
+            }
+        }
+
+        if (outputAppender.getTupleCount() > 0) {
+
+            FrameUtils.flushFrame(outputBuffer, writer);
+
+            outputAppender.reset(outputBuffer, true);
+        }
+    }
+
+    protected int sortEntry(int entryID) {
+
+        if (tPointers == null)
+            tPointers = new int[INIT_REF_COUNT * PTR_SIZE];
+        int ptr = 0;
+
+        int headerFrameIndex = entryID / frameSize * 2 * INT_SIZE + (entryID % frameSize) * 2 * INT_SIZE / frameSize;
+        int headerFrameOffset = (entryID % frameSize) * 2 * INT_SIZE % frameSize;
+
+        if (headers[headerFrameIndex] == null) {
+            return 0;
+        }
+
+        int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
+        int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+        do {
+            if (entryFrameIndex < 0) {
+                break;
+            }
+            hashtableRecordAccessor.reset(contents[entryFrameIndex]);
+            tPointers[ptr * PTR_SIZE] = entryFrameIndex;
+            tPointers[ptr * PTR_SIZE + 1] = entryTupleIndex;
+            int tStart = hashtableRecordAccessor.getTupleStartOffset(entryTupleIndex);
+            int f0StartRel = hashtableRecordAccessor.getFieldStartOffset(entryTupleIndex, internalKeys[0]);
+            int f0EndRel = hashtableRecordAccessor.getFieldEndOffset(entryTupleIndex, internalKeys[0]);
+            int f0Start = f0StartRel + tStart + hashtableRecordAccessor.getFieldSlotsLength();
+            tPointers[ptr * PTR_SIZE + 2] = firstNormalizer == null ? 0 : firstNormalizer.normalize(
+                    hashtableRecordAccessor.getBuffer().array(), f0Start, f0EndRel - f0StartRel);
+
+            ptr++;
+
+            if (ptr * PTR_SIZE >= tPointers.length) {
+                int[] newTPointers = new int[tPointers.length * 2];
+                System.arraycopy(tPointers, 0, newTPointers, 0, tPointers.length);
+                tPointers = newTPointers;
+            }
+
+            // move to the next record
+            int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
+            int prevFrameIndex = entryFrameIndex;
+            entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
+            entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
+
+        } while (true);
+
+        // sort records
+        if (ptr > 1) {
+            sort(0, ptr);
+        }
+
+        return ptr;
+    }
+
+    protected void sort(int offset, int len) {
+        int m = offset + (len >> 1);
+        int mFrameIndex = tPointers[m * PTR_SIZE];
+        int mTupleIndex = tPointers[m * PTR_SIZE + 1];
+        int mNormKey = tPointers[m * PTR_SIZE + 2];
+        compFrameAccessor1.reset(contents[mFrameIndex]);
+
+        int a = offset;
+        int b = a;
+        int c = offset + len - 1;
+        int d = c;
+        while (true) {
+            while (b <= c) {
+                int bFrameIndex = tPointers[b * PTR_SIZE];
+                int bTupleIndex = tPointers[b * PTR_SIZE + 1];
+                int bNormKey = tPointers[b * PTR_SIZE + 2];
+                int cmp = 0;
+                if (bNormKey != mNormKey) {
+                    cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+                } else {
+                    compFrameAccessor2.reset(contents[bFrameIndex]);
+                    cmp = compare(compFrameAccessor2, bTupleIndex, compFrameAccessor1, mTupleIndex);
+                }
+                if (cmp > 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(a++, b);
+                }
+                ++b;
+            }
+            while (c >= b) {
+                int cFrameIndex = tPointers[c * PTR_SIZE];
+                int cTupleIndex = tPointers[c * PTR_SIZE + 1];
+                int cNormKey = tPointers[c * PTR_SIZE + 2];
+                int cmp = 0;
+                if (cNormKey != mNormKey) {
+                    cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+                } else {
+                    compFrameAccessor2.reset(contents[cFrameIndex]);
+                    cmp = compare(compFrameAccessor2, cTupleIndex, compFrameAccessor1, mTupleIndex);
+                }
+                if (cmp < 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(c, d--);
+                }
+                --c;
+            }
+            if (b > c)
+                break;
+            swap(b++, c--);
+        }
+
+        int s;
+        int n = offset + len;
+        s = Math.min(a - offset, b - a);
+        vecswap(offset, b - s, s);
+        s = Math.min(d - c, n - d - 1);
+        vecswap(b, n - s, s);
+
+        if ((s = b - a) > 1) {
+            sort(offset, s);
+        }
+        if ((s = d - c) > 1) {
+            sort(n - s, s);
+        }
+    }
+
+    private void swap(int a, int b) {
+        for (int i = 0; i < PTR_SIZE; i++) {
+            int t = tPointers[a * PTR_SIZE + i];
+            tPointers[a * PTR_SIZE + i] = tPointers[b * PTR_SIZE + i];
+            tPointers[b * PTR_SIZE + i] = t;
+        }
+    }
+
+    private void vecswap(int a, int b, int n) {
+        for (int i = 0; i < n; i++, a++, b++) {
+            swap(a, b);
+        }
+    }
+
+    protected boolean findMatch(int entry, FrameTupleAccessor accessor, int tupleIndex) {
+
+        // reset the match pointer
+        matchPointer.frameIndex = -1;
+        matchPointer.tupleIndex = -1;
+
+        // get reference in the header
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int headerFrameOffset = getHeaderTupleIndex(entry);
+
+        if (headers[headerFrameIndex] == null) {
+            return false;
+        }
+
+        // initialize the pointer to the first record 
+        int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
+        int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+        while (entryFrameIndex >= 0) {
+            matchPointer.frameIndex = entryFrameIndex;
+            matchPointer.tupleIndex = entryTupleIndex;
+            hashtableRecordAccessor.reset(contents[entryFrameIndex]);
+
+            if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
+                return true;
+            }
+            // Move to the next record in this entry following the linked list
+            int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
+            int prevFrameIndex = entryFrameIndex;
+            entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
+            entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
+        }
+
+        return false;
+    }
+
+    public LinkedList<RunFileReader> getRunFileReaders() {
+        return runReaders;
+    }
+
+    private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
+            int hashTupleIndex) {
+        int tStart0 = accessor.getTupleStartOffset(tupleIndex);
+        int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
+        int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < keys.length; ++i) {
+            int fStart0 = accessor.getFieldStartOffset(tupleIndex, keys[i]);
+            int fEnd0 = accessor.getFieldEndOffset(tupleIndex, keys[i]);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
+            int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    private int compare(FrameTupleAccessorForGroupHashtable accessor1, int tupleIndex1,
+            FrameTupleAccessorForGroupHashtable accessor2, int tupleIndex2) {
+        int tStart1 = accessor1.getTupleStartOffset(tupleIndex1);
+        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+        int tStart2 = accessor2.getTupleStartOffset(tupleIndex2);
+        int fStartOffset2 = accessor2.getFieldSlotsLength() + tStart2;
+
+        for (int i = 0; i < internalKeys.length; ++i) {
+            int fStart1 = accessor1.getFieldStartOffset(tupleIndex1, internalKeys[i]);
+            int fEnd1 = accessor1.getFieldEndOffset(tupleIndex1, internalKeys[i]);
+            int fLen1 = fEnd1 - fStart1;
+
+            int fStart2 = accessor2.getFieldStartOffset(tupleIndex2, internalKeys[i]);
+            int fEnd2 = accessor2.getFieldEndOffset(tupleIndex2, internalKeys[i]);
+            int fLen2 = fEnd2 - fStart2;
+
+            int c = comparators[i].compare(accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1, accessor2
+                    .getBuffer().array(), fStart2 + fStartOffset2, fLen2);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    public void reset() {
+        for (int i = 0; i < headers.length; i++) {
+            if (headers[i] != null) {
+                resetHeader(i);
+            }
+        }
+        for (int i = 0; i < contents.length; i++) {
+            if (contents[i] != null) {
+                contents[i].putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
+            }
+        }
+
+        usedEntries = 0;
+        totalTupleCount = 0;
+        currentLargestFrameIndex = -1;
+    }
+
+    public void finishup() throws HyracksDataException {
+        if (runReaders.size() > 0) {
+            flush();
+        }
+
+        hashedKeys = 0;
+        hashedRawRec = 0;
+    }
+
+    /**
+     * Close the hash table. Note that only memory allocated by frames are freed. Aggregation
+     * states maintained in {@link #aggState} and run file readers in {@link #runReaders} should
+     * be valid for later processing.
+     */
+    public void close() throws HyracksDataException {
+        for (int i = 0; i < headers.length; i++) {
+            headers[i] = null;
+        }
+        for (int i = 0; i < contents.length; i++) {
+            contents[i] = null;
+        }
+        outputBuffer = null;
+        tPointers = null;
+    }
+
+    public int getTupleCount() {
+        return totalTupleCount;
+    }
+
+    public int getFrameSize() {
+        return headers.length + contents.length + 1;
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
new file mode 100644
index 0000000..5296c9f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class HybridHashSortGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+    private static final int AGGREGATE_ACTIVITY_ID = 0;
+
+    private static final int MERGE_ACTIVITY_ID = 1;
+
+    private static final long serialVersionUID = 1L;
+    private final int[] keyFields, storedKeyFields;
+    private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final IAggregatorDescriptorFactory mergerFactory;
+
+    private final ITuplePartitionComputerFactory aggTpcf, mergeTpcf;
+
+    private final int framesLimit;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+
+    private final int tableSize;
+
+    private final boolean isLoadOptimized;
+
+    public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
+            ITuplePartitionComputerFactory mergeTpcf, IAggregatorDescriptorFactory aggregatorFactory,
+            IAggregatorDescriptorFactory mergerFactory, RecordDescriptor recordDescriptor) {
+        this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, null, aggregatorFactory,
+                mergerFactory, recordDescriptor, false);
+    }
+
+    public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
+            ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+            RecordDescriptor recordDescriptor) {
+        this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, firstNormalizerFactory,
+                aggregatorFactory, mergerFactory, recordDescriptor, false);
+    }
+
+    public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
+            ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+            RecordDescriptor recordDescriptor, boolean isLoadOpt) {
+        super(spec, 1, 1);
+        this.framesLimit = framesLimit;
+        if (framesLimit <= 2) {
+            /**
+             * Minimum of 3 frames: 2 for in-memory hash table, and 1 for output
+             * aggregation results.
+             */
+            throw new IllegalStateException("frame limit should at least be 3, but it is " + framesLimit + "!");
+        }
+
+        storedKeyFields = new int[keyFields.length];
+        for (int i = 0; i < storedKeyFields.length; i++) {
+            storedKeyFields[i] = i;
+        }
+        this.aggregatorFactory = aggregatorFactory;
+        this.mergerFactory = mergerFactory;
+        this.keyFields = keyFields;
+        this.comparatorFactories = comparatorFactories;
+        this.firstNormalizerFactory = firstNormalizerFactory;
+        this.aggTpcf = aggTpcf;
+        this.mergeTpcf = mergeTpcf;
+        this.tableSize = tableSize;
+
+        /**
+         * Set the record descriptor. Note that since this operator is a unary
+         * operator, only the first record descriptor is used here.
+         */
+        recordDescriptors[0] = recordDescriptor;
+
+        this.isLoadOptimized = isLoadOpt;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities(edu.uci.ics.hyracks.api.dataflow.
+     * IActivityGraphBuilder)
+     */
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
+        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+
+        builder.addActivity(this, aggregateAct);
+        builder.addSourceEdge(0, aggregateAct, 0);
+
+        builder.addActivity(this, mergeAct);
+        builder.addTargetEdge(0, mergeAct, 0);
+
+        builder.addBlockingEdge(aggregateAct, mergeAct);
+    }
+
+    public static class AggregateActivityState extends AbstractStateObject {
+
+        private HybridHashSortGroupHashTable gTable;
+
+        public AggregateActivityState() {
+        }
+
+        private AggregateActivityState(JobId jobId, TaskId tId) {
+            super(jobId, tId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class AggregateActivity extends AbstractActivityNode {
+
+        private static final long serialVersionUID = 1L;
+
+        public AggregateActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+                throws HyracksDataException {
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+                HybridHashSortGroupHashTable serializableGroupHashtable;
+
+                FrameTupleAccessor accessor;
+
+                @Override
+                public void open() throws HyracksDataException {
+
+                    RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+
+                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                    for (int i = 0; i < comparatorFactories.length; i++) {
+                        comparators[i] = comparatorFactories[i].createBinaryComparator();
+                    }
+
+                    serializableGroupHashtable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
+                            keyFields, comparators, aggTpcf.createPartitioner(),
+                            firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(
+                                    ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
+                            recordDescriptors[0]);
+                    accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    accessor.reset(buffer);
+                    int tupleCount = accessor.getTupleCount();
+                    for (int i = 0; i < tupleCount; i++) {
+                        serializableGroupHashtable.insert(accessor, i);
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    serializableGroupHashtable.finishup();
+                    AggregateActivityState state = new AggregateActivityState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
+                    state.gTable = serializableGroupHashtable;
+                    ctx.setStateObject(state);
+                }
+            };
+        }
+    }
+
+    private class MergeActivity extends AbstractActivityNode {
+
+        private static final long serialVersionUID = 1L;
+
+        public MergeActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+                throws HyracksDataException {
+
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+                public void initialize() throws HyracksDataException {
+
+                    AggregateActivityState aggState = (AggregateActivityState) ctx.getStateObject(new TaskId(
+                            new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID), partition));
+
+                    LinkedList<RunFileReader> runs = aggState.gTable.getRunFileReaders();
+
+                    writer.open();
+                    if (runs.size() <= 0) {
+                        aggState.gTable.flushHashtableToOutput(writer);
+                        aggState.gTable.close();
+                    } else {
+                        aggState.gTable.close();
+
+                        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                        for (int i = 0; i < comparatorFactories.length; i++) {
+                            comparators[i] = comparatorFactories[i].createBinaryComparator();
+                        }
+
+                        HybridHashSortRunMerger merger = new HybridHashSortRunMerger(ctx, runs, storedKeyFields,
+                                comparators, recordDescriptors[0], mergeTpcf.createPartitioner(),
+                                mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
+                                        storedKeyFields, storedKeyFields), framesLimit, tableSize, writer,
+                                isLoadOptimized);
+
+                        merger.process();
+                    }
+
+                    writer.close();
+                }
+
+            };
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
new file mode 100644
index 0000000..1de2237
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
@@ -0,0 +1,488 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class HybridHashSortGrouperBucketMerge {
+
+    private final int[] keyFields;
+    private final IBinaryComparator[] comparators;
+
+    private final IAggregatorDescriptor merger;
+    private final AggregateState mergeState;
+
+    private final int framesLimit, tableSize;
+
+    private final RecordDescriptor inRecDesc;
+
+    private final IHyracksTaskContext ctx;
+
+    private final ArrayTupleBuilder tupleBuilder;
+
+    private final IFrameWriter outputWriter;
+
+    private final ITuplePartitionComputer tpc;
+
+    private final boolean isLoadOptimized;
+
+    List<ByteBuffer> inFrames;
+    ByteBuffer outFrame, writerFrame;
+    FrameTupleAppender outAppender, writerAppender;
+    LinkedList<RunFileReader> runs;
+    ArrayTupleBuilder finalTupleBuilder;
+    FrameTupleAccessor outFrameAccessor;
+    int[] currentFrameIndexInRun, currentRunFrames, currentBucketInRun;
+    int runFrameLimit = 1;
+
+    public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
+            ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
+            RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter)
+            throws HyracksDataException {
+        this.ctx = ctx;
+        this.framesLimit = framesLimit;
+        this.tableSize = tableSize;
+
+        this.keyFields = keyFields;
+        this.comparators = comparators;
+        this.merger = merger;
+        this.mergeState = merger.createAggregateStates();
+
+        this.inRecDesc = inRecDesc;
+
+        this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
+
+        this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
+
+        this.outputWriter = outputWriter;
+
+        this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+
+        this.tpc = tpc;
+
+        this.isLoadOptimized = true;
+    }
+
+    public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
+            ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
+            RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter, boolean loadOptimized)
+            throws HyracksDataException {
+        this.ctx = ctx;
+        this.framesLimit = framesLimit;
+        this.tableSize = tableSize;
+
+        this.keyFields = keyFields;
+        this.comparators = comparators;
+        this.merger = merger;
+        this.mergeState = merger.createAggregateStates();
+
+        this.inRecDesc = inRecDesc;
+
+        this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
+
+        this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
+
+        this.outputWriter = outputWriter;
+
+        this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+
+        this.tpc = tpc;
+
+        this.isLoadOptimized = loadOptimized;
+    }
+
+    public void initialize(LinkedList<RunFileReader> runFiles) throws HyracksDataException {
+
+        runs = runFiles;
+
+        try {
+            if (runs.size() <= 0) {
+                return;
+            } else {
+                inFrames = new ArrayList<ByteBuffer>();
+                outFrame = ctx.allocateFrame();
+                outAppender.reset(outFrame, true);
+                outFrameAccessor.reset(outFrame);
+                int runProcOffset = 0;
+                while (runs.size() > 0) {
+                    try {
+                        doPass(runs, runProcOffset);
+                        if (runs.size() + 2 <= framesLimit) {
+                            // final phase
+                            runProcOffset = 0;
+                        } else {
+                            // one more merge level
+                            runProcOffset++;
+                        }
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                inFrames.clear();
+            }
+        } catch (Exception e) {
+            outputWriter.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            mergeState.close();
+        }
+    }
+
+    private void doPass(LinkedList<RunFileReader> runs, int offset) throws HyracksDataException {
+        FileReference newRun = null;
+        IFrameWriter writer = outputWriter;
+        boolean finalPass = false;
+
+        int runNumber = runs.size() - offset;
+
+        while (inFrames.size() + 2 < framesLimit) {
+            inFrames.add(ctx.allocateFrame());
+        }
+
+        if (runNumber + 2 <= framesLimit) {
+            finalPass = true;
+            if (isLoadOptimized)
+                runFrameLimit = (framesLimit - 2) / runNumber;
+            else
+                runFrameLimit = 1;
+        } else {
+            runFrameLimit = 1;
+            runNumber = framesLimit - 2;
+            newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+                    HybridHashSortGrouperBucketMerge.class.getSimpleName());
+            writer = new RunFileWriter(newRun, ctx.getIOManager());
+            writer.open();
+        }
+        try {
+            currentFrameIndexInRun = new int[runNumber];
+            currentRunFrames = new int[runNumber];
+            currentBucketInRun = new int[runNumber];
+            /**
+             * Create file readers for each input run file, only for
+             * the ones fit into the inFrames
+             */
+            RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+            FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+            Comparator<ReferenceHashEntry> comparator = createEntryComparator(comparators);
+            ReferencedBucketBasedPriorityQueue topTuples = new ReferencedBucketBasedPriorityQueue(ctx.getFrameSize(),
+                    inRecDesc, runNumber, comparator, tpc, tableSize);
+            /**
+             * current tuple index in each run
+             */
+            int[] tupleIndices = new int[runNumber];
+
+            for (int i = 0; i < runNumber; i++) {
+                int runIndex = i + offset;
+                tupleIndices[i] = 0;
+                // Load the run file
+                runFileReaders[i] = runs.get(runIndex);
+                runFileReaders[i].open();
+
+                currentRunFrames[i] = 0;
+                currentFrameIndexInRun[i] = i * runFrameLimit;
+                for (int j = 0; j < runFrameLimit; j++) {
+                    int frameIndex = currentFrameIndexInRun[i] + j;
+                    boolean hasNextFrame = runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex));
+                    if (hasNextFrame) {
+                        tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+                        tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                        currentRunFrames[i]++;
+                        if (j == 0) {
+                            currentBucketInRun[i] = tpc.partition(tupleAccessors[frameIndex], tupleIndices[i],
+                                    tableSize);
+                            setNextTopTuple(i, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+                        }
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            /**
+             * Start merging
+             */
+            while (!topTuples.areRunsExhausted()) {
+                /**
+                 * Get the top record
+                 */
+                ReferenceEntry top = topTuples.peek();
+                int tupleIndex = top.getTupleIndex();
+                int runIndex = topTuples.peek().getRunid();
+
+                FrameTupleAccessor fta = top.getAccessor();
+
+                int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+                if (currentTupleInOutFrame < 0
+                        || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+
+                    tupleBuilder.reset();
+
+                    for (int k = 0; k < keyFields.length; k++) {
+                        tupleBuilder.addField(fta, tupleIndex, keyFields[k]);
+                    }
+
+                    merger.init(tupleBuilder, fta, tupleIndex, mergeState);
+
+                    if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                        flushOutFrame(writer, finalPass);
+                        if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                                tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                            throw new HyracksDataException(
+                                    "The partial result is too large to be initialized in a frame.");
+                        }
+                    }
+
+                } else {
+                    /**
+                     * if new tuple is in the same group of the
+                     * current aggregator do merge and output to the
+                     * outFrame
+                     */
+
+                    merger.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, mergeState);
+
+                }
+                tupleIndices[runIndex]++;
+                setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+            }
+
+            if (outAppender.getTupleCount() > 0) {
+                flushOutFrame(writer, finalPass);
+                outAppender.reset(outFrame, true);
+            }
+
+            merger.close();
+
+            runs.subList(offset, runNumber).clear();
+            /**
+             * insert the new run file into the beginning of the run
+             * file list
+             */
+            if (!finalPass) {
+                runs.add(offset, ((RunFileWriter) writer).createReader());
+            }
+        } finally {
+            if (!finalPass) {
+                writer.close();
+            }
+            mergeState.reset();
+        }
+    }
+
+    private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+
+        if (finalTupleBuilder == null) {
+            finalTupleBuilder = new ArrayTupleBuilder(inRecDesc.getFields().length);
+        }
+
+        if (writerFrame == null) {
+            writerFrame = ctx.allocateFrame();
+        }
+
+        if (writerAppender == null) {
+            writerAppender = new FrameTupleAppender(ctx.getFrameSize());
+        }
+        writerAppender.reset(writerFrame, true);
+
+        outFrameAccessor.reset(outFrame);
+
+        for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+
+            finalTupleBuilder.reset();
+
+            for (int k = 0; k < keyFields.length; k++) {
+                finalTupleBuilder.addField(outFrameAccessor, i, keyFields[k]);
+            }
+
+            if (isFinal) {
+
+                merger.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
+
+            } else {
+
+                merger.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
+            }
+
+            if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                FrameUtils.flushFrame(writerFrame, writer);
+                writerAppender.reset(writerFrame, true);
+                if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                        finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                    throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
+                }
+            }
+        }
+        if (writerAppender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(writerFrame, writer);
+            writerAppender.reset(writerFrame, true);
+        }
+
+        outAppender.reset(outFrame, true);
+    }
+
+    private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors, ReferencedBucketBasedPriorityQueue topTuples)
+            throws HyracksDataException {
+        int runStart = runIndex * runFrameLimit;
+        boolean existNext = false;
+        if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+            /**
+             * run already closed
+             */
+            existNext = false;
+        } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+            /**
+             * not the last frame for this run
+             */
+            existNext = true;
+            if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+                tupleIndices[runIndex] = 0;
+                currentFrameIndexInRun[runIndex]++;
+            }
+        } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+            /**
+             * the last frame has expired
+             */
+            existNext = true;
+        } else {
+            /**
+             * If all tuples in the targeting frame have been
+             * checked.
+             */
+            tupleIndices[runIndex] = 0;
+            currentFrameIndexInRun[runIndex] = runStart;
+            /**
+             * read in batch
+             */
+            currentRunFrames[runIndex] = 0;
+            for (int j = 0; j < runFrameLimit; j++) {
+                int frameIndex = currentFrameIndexInRun[runIndex] + j;
+                if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
+                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                    existNext = true;
+                    currentRunFrames[runIndex]++;
+                } else {
+                    break;
+                }
+            }
+        }
+
+        if (existNext) {
+            topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
+        } else {
+            topTuples.pop();
+            closeRun(runIndex, runCursors, tupleAccessors);
+        }
+    }
+
+    /**
+     * Close the run file, and also the corresponding readers and
+     * input frame.
+     * 
+     * @param index
+     * @param runCursors
+     * @param tupleAccessor
+     * @throws HyracksDataException
+     */
+    private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+            throws HyracksDataException {
+        if (runCursors[index] != null) {
+            runCursors[index].close();
+            runCursors[index] = null;
+            int frameOffset = index * runFrameLimit;
+            for (int j = 0; j < runFrameLimit; j++) {
+                tupleAccessor[frameOffset + j] = null;
+            }
+        }
+    }
+
+    private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+        byte[] b1 = fta1.getBuffer().array();
+        byte[] b2 = fta2.getBuffer().array();
+        for (int f = 0; f < keyFields.length; ++f) {
+            int fIdx = f;
+            int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+            int l1 = fta1.getFieldLength(j1, fIdx);
+            int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+            int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+            int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+            int l2 = l2_end - l2_start;
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    private Comparator<ReferenceHashEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+        return new Comparator<ReferenceHashEntry>() {
+
+            @Override
+            public int compare(ReferenceHashEntry o1, ReferenceHashEntry o2) {
+                int cmp = o1.getHashValue() - o2.getHashValue();
+                if (cmp != 0) {
+                    return cmp;
+                } else {
+                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+                    int j1 = o1.getTupleIndex();
+                    int j2 = o2.getTupleIndex();
+                    byte[] b1 = fta1.getBuffer().array();
+                    byte[] b2 = fta2.getBuffer().array();
+                    for (int f = 0; f < keyFields.length; ++f) {
+                        int fIdx = f;
+                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                                + fta1.getFieldStartOffset(j1, fIdx);
+                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                                + fta2.getFieldStartOffset(j2, fIdx);
+                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                        if (c != 0) {
+                            return c;
+                        }
+                    }
+                    return 0;
+                }
+            }
+
+        };
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
new file mode 100644
index 0000000..8695e0b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class HybridHashSortRunMerger {
+
+    private final IHyracksTaskContext ctx;
+    private final List<RunFileReader> runs;
+    private final int[] keyFields;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDesc;
+    private final int framesLimit;
+    private final int tableSize;
+    private final IFrameWriter writer;
+    private final IAggregatorDescriptor grouper;
+    private final ITuplePartitionComputer tpc;
+    private ByteBuffer outFrame;
+    private FrameTupleAppender outFrameAppender;
+    private final boolean isLoadBuffered;
+
+    public HybridHashSortRunMerger(IHyracksTaskContext ctx, LinkedList<RunFileReader> runs, int[] keyFields,
+            IBinaryComparator[] comparators, RecordDescriptor recordDesc, ITuplePartitionComputer tpc,
+            IAggregatorDescriptor grouper, int framesLimit, int tableSize, IFrameWriter writer, boolean isLoadBuffered) {
+        this.ctx = ctx;
+        this.runs = runs;
+        this.keyFields = keyFields;
+        this.comparators = comparators;
+        this.recordDesc = recordDesc;
+        this.framesLimit = framesLimit;
+        this.writer = writer;
+        this.isLoadBuffered = isLoadBuffered;
+        this.tableSize = tableSize;
+        this.tpc = tpc;
+        this.grouper = grouper;
+    }
+
+    public void process() throws HyracksDataException {
+
+        writer.open();
+        // FIXME
+        int mergeLevels = 0, mergeRunCount = 0;
+        try {
+
+            outFrame = ctx.allocateFrame();
+            outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+            outFrameAppender.reset(outFrame, true);
+
+            int maxMergeWidth = framesLimit - 1;
+            while (runs.size() > maxMergeWidth) {
+                int generationSeparator = 0;
+                // FIXME
+                int mergeRounds = 0;
+                while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
+                    int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth), runs.size()
+                            - maxMergeWidth + 1);
+                    FileReference newRun = null;
+                    IFrameWriter mergeResultWriter = this.writer;
+                    newRun = ctx.createManagedWorkspaceFile(HybridHashSortRunMerger.class.getSimpleName());
+                    mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
+                    mergeResultWriter.open();
+                    IFrameReader[] runCursors = new RunFileReader[mergeWidth];
+                    for (int i = 0; i < mergeWidth; i++) {
+                        runCursors[i] = runs.get(generationSeparator + i);
+                    }
+                    merge(mergeResultWriter, runCursors, false);
+                    runs.subList(generationSeparator, generationSeparator + mergeWidth).clear();
+                    runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
+                    mergeRounds++;
+                }
+                mergeLevels++;
+                mergeRunCount += mergeRounds;
+            }
+            if (!runs.isEmpty()) {
+                IFrameReader[] runCursors = new RunFileReader[runs.size()];
+                for (int i = 0; i < runCursors.length; i++) {
+                    runCursors[i] = runs.get(i);
+                }
+                merge(writer, runCursors, true);
+            }
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+
+            ctx.getCounterContext()
+                    .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.runs.count", true)
+                    .set(mergeRunCount);
+
+            ctx.getCounterContext()
+                    .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.levels", true)
+                    .set(mergeLevels);
+        }
+    }
+
+    private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors, boolean isFinal)
+            throws HyracksDataException {
+        // FIXME
+        long methodTimer = System.nanoTime();
+
+        IFrameReader merger = new GroupRunMergingFrameReader(ctx, runCursors, framesLimit, tableSize, keyFields, tpc,
+                comparators, grouper, recordDesc, isFinal, isLoadBuffered);
+        merger.open();
+        try {
+            while (merger.nextFrame(outFrame)) {
+                FrameUtils.flushFrame(outFrame, mergeResultWriter);
+            }
+        } finally {
+            merger.close();
+        }
+        ctx.getCounterContext()
+                .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.time", true)
+                .update(System.nanoTime() - methodTimer);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
new file mode 100644
index 0000000..3c91fea
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class ReferenceEntryWithBucketID extends ReferenceEntry {
+
+    private int bucketID;
+
+    public ReferenceEntryWithBucketID(int runid, FrameTupleAccessor fta, int tupleIndex, int bucketID) {
+        super(runid, fta, tupleIndex);
+        this.bucketID = bucketID;
+    }
+
+    public int getBucketID() {
+        return bucketID;
+    }
+
+    public void setBucketID(int bucketID) {
+        this.bucketID = bucketID;
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
new file mode 100644
index 0000000..394f0a8
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class ReferenceHashEntry extends ReferenceEntry {
+
+    private int hashValue;
+
+    public ReferenceHashEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int hashVal) {
+        super(runid, fta, tupleIndex);
+        this.hashValue = hashVal;
+    }
+
+    public int getHashValue() {
+        return hashValue;
+    }
+
+    public void setHashValue(int hashVal) {
+        this.hashValue = hashVal;
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
new file mode 100644
index 0000000..adfbe81
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Comparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class ReferencedBucketBasedPriorityQueue {
+
+    private final int frameSize;
+    private final RecordDescriptor recordDescriptor;
+    private final ReferenceHashEntry entries[];
+    private final int size;
+    private final BitSet runAvail;
+    private int nItems;
+    private final int tableSize;
+
+    private final Comparator<ReferenceHashEntry> comparator;
+
+    private final ITuplePartitionComputer tpc;
+
+    public ReferencedBucketBasedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
+            Comparator<ReferenceHashEntry> comparator, ITuplePartitionComputer tpc, int tableSize) {
+        this.frameSize = frameSize;
+        this.recordDescriptor = recordDescriptor;
+        if (initSize < 1)
+            throw new IllegalArgumentException();
+        this.comparator = comparator;
+        nItems = initSize;
+        size = (initSize + 1) & 0xfffffffe;
+        entries = new ReferenceHashEntry[size];
+        runAvail = new BitSet(size);
+        runAvail.set(0, initSize, true);
+        for (int i = 0; i < size; i++) {
+            entries[i] = new ReferenceHashEntry(i, null, -1, -1);
+        }
+        this.tpc = tpc;
+        this.tableSize = tableSize;
+    }
+
+    /**
+     * Retrieve the top entry without removing it
+     * 
+     * @return the top entry
+     */
+    public ReferenceEntry peek() {
+        return entries[0];
+    }
+
+    /**
+     * compare the new entry with entries within the queue, to find a spot for
+     * this new entry
+     * 
+     * @param entry
+     * @return runid of this entry
+     * @throws HyracksDataException
+     * @throws IOException
+     */
+    public int popAndReplace(FrameTupleAccessor fta, int tIndex) throws HyracksDataException {
+        ReferenceHashEntry entry = entries[0];
+        if (entry.getAccessor() == null) {
+            entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
+        }
+        entry.getAccessor().reset(fta.getBuffer());
+        entry.setTupleIndex(tIndex);
+        entry.setHashValue(tpc.partition(fta, tIndex, tableSize));
+
+        add(entry);
+        return entry.getRunid();
+    }
+
+    /**
+     * Push entry into priority queue
+     * 
+     * @param e
+     *            the new Entry
+     * @throws HyracksDataException
+     */
+    private void add(ReferenceHashEntry e) throws HyracksDataException {
+        ReferenceHashEntry min = entries[0];
+        int slot = (size >> 1) + (min.getRunid() >> 1);
+
+        ReferenceHashEntry curr = e;
+        while (!runAvail.isEmpty() && slot > 0) {
+            int c = 0;
+            if (!runAvail.get(entries[slot].getRunid())) {
+                // run of entries[slot] is exhausted, i.e. not available, curr
+                // wins
+                c = 1;
+            } else if (entries[slot].getAccessor() != null /*
+                                                            * entries[slot] is
+                                                            * not MIN value
+                                                            */
+                    && runAvail.get(curr.getRunid() /* curr run is available */)) {
+
+                if (curr.getAccessor() != null) {
+                    c = comparator.compare(entries[slot], curr);
+                } else {
+                    // curr is MIN value, wins
+                    c = 1;
+                }
+            }
+
+            if (c <= 0) { // curr lost
+                // entries[slot] swaps up
+                ReferenceHashEntry tmp = entries[slot];
+                entries[slot] = curr;
+                curr = tmp;// winner to pass up
+            }// else curr wins
+            slot >>= 1;
+        }
+        // set new entries[0]
+        entries[0] = curr;
+    }
+
+    /**
+     * Pop is called only when a run is exhausted
+     * 
+     * @return
+     * @throws HyracksDataException
+     */
+    public ReferenceHashEntry pop() throws HyracksDataException {
+        ReferenceHashEntry min = entries[0];
+        runAvail.clear(min.getRunid());
+        add(min);
+        nItems--;
+        return min;
+    }
+
+    public boolean areRunsExhausted() {
+        return runAvail.isEmpty();
+    }
+
+    public int size() {
+        return nItems;
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
new file mode 100644
index 0000000..d9d5118
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
@@ -0,0 +1,133 @@
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Comparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * TODO need to be merged with the ReferencedPriorityQueue in the util package
+ */
+public class ReferencedPriorityQueue {
+    private final int frameSize;
+    private final RecordDescriptor recordDescriptor;
+    private final ReferenceEntryWithBucketID entries[];
+    private final int size;
+    private final BitSet runAvail;
+    private int nItems;
+
+    private final Comparator<ReferenceEntryWithBucketID> comparator;
+
+    public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
+            Comparator<ReferenceEntryWithBucketID> comparator) {
+        this.frameSize = frameSize;
+        this.recordDescriptor = recordDescriptor;
+        if (initSize < 1)
+            throw new IllegalArgumentException();
+        this.comparator = comparator;
+        nItems = initSize;
+        size = (initSize + 1) & 0xfffffffe;
+        entries = new ReferenceEntryWithBucketID[size];
+        runAvail = new BitSet(size);
+        runAvail.set(0, initSize, true);
+        for (int i = 0; i < size; i++) {
+            entries[i] = new ReferenceEntryWithBucketID(i, null, -1, -1);
+        }
+    }
+
+    /**
+     * Retrieve the top entry without removing it
+     * 
+     * @return the top entry
+     */
+    public ReferenceEntryWithBucketID peek() {
+        return entries[0];
+    }
+
+    /**
+     * compare the new entry with entries within the queue, to find a spot for
+     * this new entry
+     * 
+     * @param entry
+     * @return runid of this entry
+     * @throws IOException
+     */
+    public int popAndReplace(FrameTupleAccessor fta, int tIndex, int bucketID) {
+        ReferenceEntryWithBucketID entry = entries[0];
+        if (entry.getAccessor() == null) {
+            entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
+        }
+        entry.getAccessor().reset(fta.getBuffer());
+        entry.setTupleIndex(tIndex);
+        entry.setBucketID(bucketID);
+
+        add(entry);
+        return entry.getRunid();
+    }
+
+    /**
+     * Push entry into priority queue
+     * 
+     * @param e
+     *            the new Entry
+     */
+    private void add(ReferenceEntryWithBucketID e) {
+        ReferenceEntryWithBucketID min = entries[0];
+        int slot = (size >> 1) + (min.getRunid() >> 1);
+
+        ReferenceEntryWithBucketID curr = e;
+        while (!runAvail.isEmpty() && slot > 0) {
+            int c = 0;
+            if (!runAvail.get(entries[slot].getRunid())) {
+                // run of entries[slot] is exhausted, i.e. not available, curr
+                // wins
+                c = 1;
+            } else if (entries[slot].getAccessor() != null /*
+                                                            * entries[slot] is
+                                                            * not MIN value
+                                                            */
+                    && runAvail.get(curr.getRunid() /* curr run is available */)) {
+
+                if (curr.getAccessor() != null) {
+                    c = comparator.compare(entries[slot], curr);
+                } else {
+                    // curr is MIN value, wins
+                    c = 1;
+                }
+            }
+
+            if (c <= 0) { // curr lost
+                // entries[slot] swaps up
+                ReferenceEntryWithBucketID tmp = entries[slot];
+                entries[slot] = curr;
+                curr = tmp;// winner to pass up
+            }// else curr wins
+            slot >>= 1;
+        }
+        // set new entries[0]
+        entries[0] = curr;
+    }
+
+    /**
+     * Pop is called only when a run is exhausted
+     * 
+     * @return
+     */
+    public ReferenceEntryWithBucketID pop() {
+        ReferenceEntryWithBucketID min = entries[0];
+        runAvail.clear(min.getRunid());
+        add(min);
+        nItems--;
+        return min;
+    }
+
+    public boolean areRunsExhausted() {
+        return runAvail.isEmpty();
+    }
+
+    public int size() {
+        return nItems;
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
new file mode 100644
index 0000000..72bae76
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class FrameTupleAccessorForGroupHashtable implements IFrameTupleAccessor {
+    private final int frameSize;
+    private final RecordDescriptor recordDescriptor;
+
+    private final static int INT_SIZE = 4;
+
+    private ByteBuffer buffer;
+
+    public FrameTupleAccessorForGroupHashtable(int frameSize, RecordDescriptor recordDescriptor) {
+        this.frameSize = frameSize;
+        this.recordDescriptor = recordDescriptor;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldCount()
+     */
+    @Override
+    public int getFieldCount() {
+        return recordDescriptor.getFieldCount();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldSlotsLength()
+     */
+    @Override
+    public int getFieldSlotsLength() {
+        return getFieldCount() * 4;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldEndOffset(int, int)
+     */
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldStartOffset(int, int)
+     */
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldLength(int, int)
+     */
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleEndOffset(int)
+     */
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1)) - 2 * INT_SIZE;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleStartOffset(int)
+     */
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleCount()
+     */
+    @Override
+    public int getTupleCount() {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getBuffer()
+     */
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#reset(java.nio.ByteBuffer)
+     */
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public int getTupleHashReferenceOffset(int tupleIndex) {
+        return getTupleEndOffset(tupleIndex);
+    }
+
+    public int getTupleEndOffsetWithHashReference(int tupleIndex) {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+    }
+
+    public int getHashReferenceNextFrameIndex(int tupleIndex) {
+        return buffer.getInt(getTupleHashReferenceOffset(tupleIndex));
+    }
+
+    public int getHashReferenceNextTupleIndex(int tupleIndex) {
+        return buffer.getInt(getTupleHashReferenceOffset(tupleIndex) + INT_SIZE);
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
new file mode 100644
index 0000000..c5668f5
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public class FrameTupleAppenderForGroupHashtable {
+    private final int frameSize;
+
+    private ByteBuffer buffer;
+
+    private int tupleCount;
+
+    private int tupleDataEndOffset;
+
+    public FrameTupleAppenderForGroupHashtable(int frameSize) {
+        this.frameSize = frameSize;
+    }
+
+    public void reset(ByteBuffer buffer, boolean clear) {
+        this.buffer = buffer;
+        if (clear) {
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
+            tupleCount = 0;
+            tupleDataEndOffset = 0;
+        } else {
+            tupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+            tupleDataEndOffset = tupleCount == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)
+                    - tupleCount * 4);
+        }
+    }
+
+    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) {
+        if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            for (int i = 0; i < fieldSlots.length; ++i) {
+                buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+            }
+            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + fieldSlots.length * 4, length);
+            buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length, -1);
+            buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length + 4, -1);
+            tupleDataEndOffset += fieldSlots.length * 4 + length + 2 * 4;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean append(byte[] bytes, int offset, int length) {
+        if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
+            buffer.putInt(tupleDataEndOffset + length, -1);
+            buffer.putInt(tupleDataEndOffset + length + 4, -1);
+            tupleDataEndOffset += length + 2 * 4;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) {
+        if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            int effectiveSlots = 0;
+            for (int i = 0; i < fieldSlots.length; ++i) {
+                if (fieldSlots[i] > 0) {
+                    buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+                    effectiveSlots++;
+                }
+            }
+            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + effectiveSlots * 4, length);
+            buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length, -1);
+            buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length + 4, -1);
+            tupleDataEndOffset += effectiveSlots * 4 + length + 2 * 4;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
+        int length = tEndOffset - tStartOffset;
+        if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            ByteBuffer src = tupleAccessor.getBuffer();
+            System.arraycopy(src.array(), tStartOffset, buffer.array(), tupleDataEndOffset, length);
+            buffer.putInt(tupleDataEndOffset + length, -1);
+            buffer.putInt(tupleDataEndOffset + length + 4, -1);
+            tupleDataEndOffset += length + 2 * 4;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) {
+        int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
+        int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
+        return append(tupleAccessor, tStartOffset, tEndOffset);
+    }
+
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+}
+
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
new file mode 100644
index 0000000..be92b84
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
@@ -0,0 +1,609 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HybridHashGroupHashTable implements IFrameWriter {
+
+    private final static int HEADER_REF_EMPTY = -1;
+
+    private static final int INT_SIZE = 4;
+
+    private IHyracksTaskContext ctx;
+
+    private final int frameSize;
+
+    private final int framesLimit;
+
+    private final int tableSize;
+
+    private final int numOfPartitions;
+
+    private final IFrameWriter outputWriter;
+
+    private final IBinaryComparator[] comparators;
+
+    /**
+     * index for keys
+     */
+    private final int[] inputKeys, internalKeys;
+
+    private final RecordDescriptor inputRecordDescriptor, outputRecordDescriptor;
+
+    /**
+     * hash partitioner for hashing
+     */
+    private final ITuplePartitionComputer hashComputer;
+
+    /**
+     * hash partitioner for partitioning
+     */
+    private final ITuplePartitionComputer partitionComputer;
+
+    /**
+     * Hashtable haders
+     */
+    private ByteBuffer[] headers;
+
+    /**
+     * buffers for hash table
+     */
+    private ByteBuffer[] contents;
+
+    /**
+     * output buffers for spilled partitions
+     */
+    private ByteBuffer[] spilledPartOutputBuffers;
+
+    /**
+     * run writers for spilled partitions
+     */
+    private RunFileWriter[] spilledPartRunWriters;
+
+    private int[] spilledPartRunSizeArrayInFrames;
+    private int[] spilledPartRunSizeArrayInTuples;
+
+    private List<IFrameReader> spilledPartRunReaders = null;
+    private List<Integer> spilledRunAggregatedPages = null;
+    private List<Integer> spilledPartRunSizesInFrames = null;
+    private List<Integer> spilledPartRunSizesInTuples = null;
+
+    /**
+     * index of the current working buffer in hash table
+     */
+    private int currentHashTableFrame;
+
+    /**
+     * Aggregation state
+     */
+    private AggregateState htAggregateState;
+
+    /**
+     * the aggregator
+     */
+    private final IAggregatorDescriptor aggregator;
+
+    /**
+     * records inserted into the in-memory hash table (for hashing and aggregation)
+     */
+    private int hashedRawRecords = 0;
+
+    /**
+     * in-memory part size in tuples
+     */
+    private int hashedKeys = 0;
+
+    /**
+     * Hash table tuple pointer
+     */
+    private TuplePointer matchPointer;
+
+    /**
+     * Frame tuple accessor for input data frames
+     */
+    private FrameTupleAccessor inputFrameTupleAccessor;
+
+    /**
+     * flag for whether the hash table if full
+     */
+    private boolean isHashtableFull;
+
+    /**
+     * flag for only partition (no aggregation and hashing)
+     */
+    private boolean isPartitionOnly;
+
+    /**
+     * Tuple accessor for hash table contents
+     */
+    private FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
+
+    private ArrayTupleBuilder internalTupleBuilder;
+
+    private FrameTupleAppender spilledPartInsertAppender;
+
+    private FrameTupleAppenderForGroupHashtable htInsertAppender;
+
+    public HybridHashGroupHashTable(IHyracksTaskContext ctx, int framesLimit, int tableSize, int numOfPartitions,
+            int[] keys, int hashSeedOffset, IBinaryComparator[] comparators, ITuplePartitionComputerFamily tpcFamily,
+            IAggregatorDescriptor aggregator, RecordDescriptor inputRecordDescriptor,
+            RecordDescriptor outputRecordDescriptor, IFrameWriter outputWriter) throws HyracksDataException {
+        this.ctx = ctx;
+        this.frameSize = ctx.getFrameSize();
+        this.tableSize = tableSize;
+        this.framesLimit = framesLimit;
+        this.numOfPartitions = numOfPartitions;
+        this.inputKeys = keys;
+        this.internalKeys = new int[keys.length];
+        for (int i = 0; i < internalKeys.length; i++) {
+            internalKeys[i] = i;
+        }
+
+        this.comparators = comparators;
+
+        this.inputRecordDescriptor = inputRecordDescriptor;
+        this.outputRecordDescriptor = outputRecordDescriptor;
+
+        this.outputWriter = outputWriter;
+
+        this.hashComputer = tpcFamily.createPartitioner(hashSeedOffset * 2);
+        this.partitionComputer = tpcFamily.createPartitioner(hashSeedOffset * 2 + 1);
+
+        this.aggregator = aggregator;
+
+    }
+
+    public static double getHashtableOverheadRatio(int tableSize, int frameSize, int framesLimit, int recordSizeInByte) {
+        int pagesForRecord = framesLimit - getHeaderPages(tableSize, frameSize);
+        int recordsInHashtable = (pagesForRecord - 1) * ((int) (frameSize / (recordSizeInByte + 2 * INT_SIZE)));
+
+        return (double) framesLimit * frameSize / recordsInHashtable / recordSizeInByte;
+    }
+
+    public static int getHeaderPages(int tableSize, int frameSize) {
+        return (int) Math.ceil((double)tableSize * INT_SIZE * 2 / frameSize);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        // initialize hash headers
+        int htHeaderCount = getHeaderPages(tableSize, frameSize);
+
+        isPartitionOnly = false;
+        if (numOfPartitions >= framesLimit - htHeaderCount) {
+            isPartitionOnly = true;
+        }
+
+        if (isPartitionOnly) {
+            htHeaderCount = 0;
+        }
+
+        headers = new ByteBuffer[htHeaderCount];
+
+        // initialize hash table contents
+        contents = new ByteBuffer[framesLimit - htHeaderCount - numOfPartitions];
+        currentHashTableFrame = 0;
+        isHashtableFull = false;
+
+        // initialize hash table aggregate state
+        htAggregateState = aggregator.createAggregateStates();
+
+        // initialize partition information
+        spilledPartOutputBuffers = new ByteBuffer[numOfPartitions];
+        spilledPartRunWriters = new RunFileWriter[numOfPartitions];
+        spilledPartRunSizeArrayInFrames = new int[numOfPartitions];
+        spilledPartRunSizeArrayInTuples = new int[numOfPartitions];
+
+        // initialize other helper classes
+        inputFrameTupleAccessor = new FrameTupleAccessor(frameSize, inputRecordDescriptor);
+        internalTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
+        spilledPartInsertAppender = new FrameTupleAppender(frameSize);
+
+        htInsertAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
+        matchPointer = new TuplePointer();
+        hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outputRecordDescriptor);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inputFrameTupleAccessor.reset(buffer);
+        int tupleCount = inputFrameTupleAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            insert(inputFrameTupleAccessor, i);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (spilledPartRunWriters[i] != null) {
+                spilledPartRunWriters[i].close();
+            }
+        }
+        htAggregateState.close();
+    }
+
+    private void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
+
+        if (isPartitionOnly) {
+            // for partition only
+            int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
+            insertSpilledPartition(accessor, tupleIndex, pid);
+            spilledPartRunSizeArrayInTuples[pid]++;
+            return;
+        }
+
+        int hid = hashComputer.partition(accessor, tupleIndex, tableSize);
+
+        if (findMatch(hid, accessor, tupleIndex)) {
+            // found a matching: do aggregation
+            hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+            aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex,
+                    htAggregateState);
+            hashedRawRecords++;
+        } else {
+            if (isHashtableFull) {
+                // when hash table is full: spill the record
+                int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
+                insertSpilledPartition(accessor, tupleIndex, pid);
+                spilledPartRunSizeArrayInTuples[pid]++;
+            } else {
+                // insert a new entry into the hash table
+                internalTupleBuilder.reset();
+                for (int k = 0; k < inputKeys.length; k++) {
+                    internalTupleBuilder.addField(accessor, tupleIndex, inputKeys[k]);
+                }
+
+                aggregator.init(internalTupleBuilder, accessor, tupleIndex, htAggregateState);
+
+                if (contents[currentHashTableFrame] == null) {
+                    contents[currentHashTableFrame] = ctx.allocateFrame();
+                }
+
+                htInsertAppender.reset(contents[currentHashTableFrame], false);
+                if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                    // hash table is full: try to allocate more frame
+                    currentHashTableFrame++;
+                    if (currentHashTableFrame >= contents.length) {
+                        // no more frame to allocate: stop expending the hash table
+                        isHashtableFull = true;
+
+                        // reinsert the record
+                        insert(accessor, tupleIndex);
+
+                        return;
+                    } else {
+                        if (contents[currentHashTableFrame] == null) {
+                            contents[currentHashTableFrame] = ctx.allocateFrame();
+                        }
+
+                        htInsertAppender.reset(contents[currentHashTableFrame], true);
+
+                        if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+                                internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                            throw new HyracksDataException(
+                                    "Failed to insert an aggregation partial result into the in-memory hash table: it has the length of "
+                                            + internalTupleBuilder.getSize() + " and fields "
+                                            + internalTupleBuilder.getFieldEndOffsets().length);
+                        }
+
+                    }
+                }
+
+                // update hash table reference
+                if (matchPointer.frameIndex < 0) {
+                    // need to initialize the hash table header
+                    int headerFrameIndex = getHeaderFrameIndex(hid);
+                    int headerFrameOffset = getHeaderTupleIndex(hid);
+
+                    if (headers[headerFrameIndex] == null) {
+                        headers[headerFrameIndex] = ctx.allocateFrame();
+                        resetHeader(headerFrameIndex);
+                    }
+
+                    headers[headerFrameIndex].putInt(headerFrameOffset, currentHashTableFrame);
+                    headers[headerFrameIndex]
+                            .putInt(headerFrameOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
+                } else {
+                    // update the previous reference
+                    hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+                    int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
+                    contents[matchPointer.frameIndex].putInt(refOffset, currentHashTableFrame);
+                    contents[matchPointer.frameIndex]
+                            .putInt(refOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
+                }
+
+                hashedKeys++;
+                hashedRawRecords++;
+            }
+        }
+    }
+
+    /**
+     * Insert record into a spilled partition, by directly copying the tuple into the output buffer.
+     * 
+     * @param accessor
+     * @param tupleIndex
+     * @param pid
+     */
+    private void insertSpilledPartition(FrameTupleAccessor accessor, int tupleIndex, int pid)
+            throws HyracksDataException {
+
+        if (spilledPartOutputBuffers[pid] == null) {
+            spilledPartOutputBuffers[pid] = ctx.allocateFrame();
+        }
+
+        spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], false);
+
+        if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
+            // the output buffer is full: flush
+            flushSpilledPartitionOutputBuffer(pid);
+            // reset the output buffer
+            spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], true);
+
+            if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
+                throw new HyracksDataException("Failed to insert a record into a spilled partition!");
+            }
+        }
+
+    }
+
+    /**
+     * Flush a spilled partition's output buffer.
+     * 
+     * @param pid
+     * @throws HyracksDataException
+     */
+    private void flushSpilledPartitionOutputBuffer(int pid) throws HyracksDataException {
+        if (spilledPartRunWriters[pid] == null) {
+            spilledPartRunWriters[pid] = new RunFileWriter(
+                    ctx.createManagedWorkspaceFile("HashHashPrePartitionHashTable"), ctx.getIOManager());
+            spilledPartRunWriters[pid].open();
+        }
+
+        FrameUtils.flushFrame(spilledPartOutputBuffers[pid], spilledPartRunWriters[pid]);
+
+        spilledPartRunSizeArrayInFrames[pid]++;
+    }
+
+    /**
+     * Hash table lookup
+     * 
+     * @param hid
+     * @param accessor
+     * @param tupleIndex
+     * @return
+     */
+    private boolean findMatch(int hid, FrameTupleAccessor accessor, int tupleIndex) {
+
+        matchPointer.frameIndex = -1;
+        matchPointer.tupleIndex = -1;
+
+        // get reference in the header
+        int headerFrameIndex = getHeaderFrameIndex(hid);
+        int headerFrameOffset = getHeaderTupleIndex(hid);
+
+        if (headers[headerFrameIndex] == null) {
+            return false;
+        }
+
+        // initialize the pointer to the first record 
+        int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
+        int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+        while (entryFrameIndex >= 0) {
+            matchPointer.frameIndex = entryFrameIndex;
+            matchPointer.tupleIndex = entryTupleIndex;
+            hashtableRecordAccessor.reset(contents[entryFrameIndex]);
+            if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
+                return true;
+            }
+            // Move to the next record in this entry following the linked list
+            int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
+            int prevFrameIndex = entryFrameIndex;
+            entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
+            entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
+        }
+
+        return false;
+    }
+
+    public void finishup() throws HyracksDataException {
+        // spill all output buffers
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (spilledPartOutputBuffers[i] != null) {
+                flushSpilledPartitionOutputBuffer(i);
+            }
+        }
+        spilledPartOutputBuffers = null;
+
+        // flush in-memory aggregation results: no more frame cost here as all output buffers are recycled
+        ByteBuffer outputBuffer = ctx.allocateFrame();
+        FrameTupleAppender outputBufferAppender = new FrameTupleAppender(frameSize);
+        outputBufferAppender.reset(outputBuffer, true);
+
+        ArrayTupleBuilder outFlushTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
+
+        for (ByteBuffer htFrame : contents) {
+            if (htFrame == null) {
+                continue;
+            }
+            hashtableRecordAccessor.reset(htFrame);
+            int tupleCount = hashtableRecordAccessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                outFlushTupleBuilder.reset();
+
+                for (int k = 0; k < internalKeys.length; k++) {
+                    outFlushTupleBuilder.addField(hashtableRecordAccessor, i, internalKeys[k]);
+                }
+
+                aggregator.outputFinalResult(outFlushTupleBuilder, hashtableRecordAccessor, i, htAggregateState);
+
+                if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
+                        outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
+                    FrameUtils.flushFrame(outputBuffer, outputWriter);
+                    outputBufferAppender.reset(outputBuffer, true);
+
+                    if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
+                            outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
+                        throw new HyracksDataException(
+                                "Failed to flush a record from in-memory hash table: record has length of "
+                                        + outFlushTupleBuilder.getSize() + " and fields "
+                                        + outFlushTupleBuilder.getFieldEndOffsets().length);
+                    }
+                }
+            }
+        }
+
+        if (outputBufferAppender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(outputBuffer, outputWriter);
+        }
+
+        // create run readers and statistic information for spilled runs
+        spilledPartRunReaders = new LinkedList<IFrameReader>();
+        spilledRunAggregatedPages = new LinkedList<Integer>();
+        spilledPartRunSizesInFrames = new LinkedList<Integer>();
+        spilledPartRunSizesInTuples = new LinkedList<Integer>();
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (spilledPartRunWriters[i] != null) {
+                spilledPartRunReaders.add(spilledPartRunWriters[i].createReader());
+                spilledRunAggregatedPages.add(0);
+                spilledPartRunWriters[i].close();
+                spilledPartRunSizesInFrames.add(spilledPartRunSizeArrayInFrames[i]);
+                spilledPartRunSizesInTuples.add(spilledPartRunSizeArrayInTuples[i]);
+            }
+        }
+    }
+
+    /**
+     * Compare an input record with a hash table entry.
+     * 
+     * @param accessor
+     * @param tupleIndex
+     * @param hashAccessor
+     * @param hashTupleIndex
+     * @return
+     */
+    private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
+            int hashTupleIndex) {
+        int tStart0 = accessor.getTupleStartOffset(tupleIndex);
+        int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
+        int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < internalKeys.length; ++i) {
+            int fStart0 = accessor.getFieldStartOffset(tupleIndex, inputKeys[i]);
+            int fEnd0 = accessor.getFieldEndOffset(tupleIndex, inputKeys[i]);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
+            int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    /**
+     * Get the header frame index of the given hash table entry
+     * 
+     * @param entry
+     * @return
+     */
+    private int getHeaderFrameIndex(int entry) {
+        int frameIndex = (entry / frameSize * 2 * INT_SIZE) + (entry % frameSize * 2 * INT_SIZE / frameSize);
+        return frameIndex;
+    }
+
+    /**
+     * Get the tuple index of the given hash table entry
+     * 
+     * @param entry
+     * @return
+     */
+    private int getHeaderTupleIndex(int entry) {
+        int offset = (entry % frameSize) * 2 * INT_SIZE % frameSize;
+        return offset;
+    }
+
+    /**
+     * reset the header page.
+     * 
+     * @param headerFrameIndex
+     */
+    private void resetHeader(int headerFrameIndex) {
+        for (int i = 0; i < frameSize; i += INT_SIZE) {
+            headers[headerFrameIndex].putInt(i, HEADER_REF_EMPTY);
+        }
+    }
+
+    public List<Integer> getSpilledRunsSizeInTuples() throws HyracksDataException {
+        return spilledPartRunSizesInTuples;
+    }
+
+    public int getHashedUniqueKeys() throws HyracksDataException {
+        return hashedKeys;
+    }
+
+    public int getHashedRawRecords() throws HyracksDataException {
+        return hashedRawRecords;
+    }
+
+    public List<Integer> getSpilledRunsAggregatedPages() throws HyracksDataException {
+        return spilledRunAggregatedPages;
+    }
+
+    public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
+        return spilledPartRunReaders;
+    }
+
+    public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
+        return spilledPartRunSizesInFrames;
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
new file mode 100644
index 0000000..8a94c9f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortGroupHashTable;
+import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortRunMerger;
+
+public class HybridHashGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final double HYBRID_FALLBACK_THRESHOLD = 0.8;
+
+    // merge with fudge factor
+    private static final double ESTIMATOR_MAGNIFIER = 1.2;
+
+    // input key fields
+    private final int[] keyFields;
+
+    // intermediate and final key fields
+    private final int[] storedKeyFields;
+
+    /**
+     * Input sizes as the count of the raw records.
+     */
+    private final long inputSizeInRawRecords;
+
+    /**
+     * Input size as the count of the unique keys.
+     */
+    private final long inputSizeInUniqueKeys;
+
+    // hash table size
+    private final int tableSize;
+
+    // estimated record size: used for compute the fudge factor
+    private final int userProvidedRecordSizeInBytes;
+
+    // aggregator
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+
+    // merger, in case of falling back to the hash-sort algorithm for hash skewness
+    private final IAggregatorDescriptorFactory mergerFactory;
+
+    // for the sort fall-back algorithm
+    private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+    // total memory in pages
+    private final int framesLimit;
+
+    // comparator factories for key fields.
+    private final IBinaryComparatorFactory[] comparatorFactories;
+
+    /**
+     * hash families for each field: a hash function family is need as we may have
+     * more than one levels of hashing
+     */
+    private final IBinaryHashFunctionFamily[] hashFamilies;
+
+    /**
+     * Flag for input adjustment
+     */
+    private final boolean doInputAdjustment;
+
+    private final static double FUDGE_FACTOR_ESTIMATION = 1.2;
+
+    public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
+            IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
+            int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+            RecordDescriptor outRecDesc) throws HyracksDataException {
+        this(spec, keyFields, framesLimit, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+                comparatorFactories, hashFamilies, hashFuncStartLevel, firstNormalizerFactory, aggregatorFactory,
+                mergerFactory, outRecDesc, true);
+    }
+
+    public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
+            IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
+            int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+            RecordDescriptor outRecDesc, boolean doInputAdjustment) throws HyracksDataException {
+        super(spec, 1, 1);
+        this.framesLimit = framesLimit;
+        this.tableSize = tableSize;
+        this.userProvidedRecordSizeInBytes = recordSizeInBytes;
+
+        this.inputSizeInRawRecords = inputSizeInRawRecords;
+        this.inputSizeInUniqueKeys = inputSizeInUniqueKeys;
+
+        if (framesLimit <= 3) {
+            // at least 3 frames: 2 for in-memory hash table, and 1 for output buffer
+            throw new HyracksDataException(
+                    "Not enough memory for Hash-Hash Aggregation algorithm: at least 3 frames are necessary, but only "
+                            + framesLimit + " available.");
+        }
+
+        this.keyFields = keyFields;
+        storedKeyFields = new int[keyFields.length];
+        for (int i = 0; i < storedKeyFields.length; i++) {
+            storedKeyFields[i] = i;
+        }
+
+        this.aggregatorFactory = aggregatorFactory;
+
+        this.mergerFactory = mergerFactory;
+        this.firstNormalizerFactory = firstNormalizerFactory;
+
+        this.comparatorFactories = comparatorFactories;
+
+        this.hashFamilies = hashFamilies;
+
+        recordDescriptors[0] = outRecDesc;
+
+        this.doInputAdjustment = doInputAdjustment;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparators.length; i++) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+
+        final RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+
+        final int frameSize = ctx.getFrameSize();
+
+        final double fudgeFactor = HybridHashGroupHashTable.getHashtableOverheadRatio(tableSize, frameSize,
+                framesLimit, userProvidedRecordSizeInBytes) * FUDGE_FACTOR_ESTIMATION;
+
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+            HybridHashGroupHashTable topProcessor;
+
+            int observedInputSizeInFrames;
+
+            int userProvidedInputSizeInFrames;
+
+            boolean topLevelFallbackCheck = true;
+
+            ITuplePartitionComputerFamily tpcf = new FieldHashPartitionComputerFamily(keyFields, hashFamilies);
+
+            ITuplePartitionComputerFamily tpcfMerge = new FieldHashPartitionComputerFamily(storedKeyFields,
+                    hashFamilies);
+
+            ByteBuffer readAheadBuf;
+
+            /**
+             * Compute the partition numbers using hybrid-hash formula.
+             * 
+             * @param tableSize
+             * @param framesLimit
+             * @param inputKeySize
+             * @param partitionInOperator
+             * @param factor
+             * @return
+             */
+            private int getNumberOfPartitions(int tableSize, int framesLimit, int inputKeySize, double factor) {
+
+                int hashtableHeaderPages = HybridHashGroupHashTable.getHeaderPages(tableSize, frameSize);
+
+                int numberOfPartitions = HybridHashUtil.hybridHashPartitionComputer((int) Math.ceil(inputKeySize),
+                        framesLimit, factor);
+
+                // if the partition number is more than the available hash table contents, do pure partition.
+                if (numberOfPartitions >= framesLimit - hashtableHeaderPages) {
+                    numberOfPartitions = framesLimit;
+                }
+
+                if (numberOfPartitions <= 0) {
+                    numberOfPartitions = 1;
+                }
+
+                return numberOfPartitions;
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+
+                observedInputSizeInFrames = 0;
+
+                // estimate the number of unique keys for this partition, given the total raw record count and unique record count
+                long estimatedNumberOfUniqueKeys = HybridHashUtil.getEstimatedPartitionSizeOfUniqueKeys(
+                        inputSizeInRawRecords, inputSizeInUniqueKeys, 1);
+
+                userProvidedInputSizeInFrames = (int) Math.ceil(estimatedNumberOfUniqueKeys
+                        * userProvidedRecordSizeInBytes / frameSize);
+
+                int topPartitions = getNumberOfPartitions(tableSize, framesLimit,
+                        (int) Math.ceil(userProvidedInputSizeInFrames * ESTIMATOR_MAGNIFIER), fudgeFactor);
+
+                topProcessor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize, topPartitions, keyFields, 0,
+                        comparators, tpcf, aggregatorFactory.createAggregator(ctx, inRecDesc, recordDescriptors[0],
+                                keyFields, storedKeyFields), inRecDesc, recordDescriptors[0], writer);
+
+                writer.open();
+                topProcessor.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                observedInputSizeInFrames++;
+                topProcessor.nextFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                topProcessor.finishup();
+
+                List<IFrameReader> runs = topProcessor.getSpilledRuns();
+                List<Integer> runsSizeInFrames = topProcessor.getSpilledRunsSizeInPages();
+
+                // get statistics from the hash table
+                int hashedKeys = topProcessor.getHashedUniqueKeys();
+                int hashedRawRecords = topProcessor.getHashedRawRecords();
+
+                // Get the raw record size of each partition (in records but not in pages)
+                List<Integer> partitionRawRecordsCount = topProcessor.getSpilledRunsSizeInTuples();
+
+                topProcessor.close();
+
+                // get a new estimation on the number of keys in the input data set: if the previous level is pure-partition, 
+                // then use the size inputed in the previous level; otherwise, compute the key ratio in the data set based on
+                // the processed keys.
+                int newKeySizeInPages = (doInputAdjustment && hashedRawRecords > 0) ? (int) Math
+                        .ceil((double) hashedKeys / hashedRawRecords * observedInputSizeInFrames) : (int) Math
+                        .ceil(userProvidedInputSizeInFrames);
+
+                IFrameReader runReader;
+                int runSizeInFrames;
+                int partitionRawRecords;
+
+                while (!runs.isEmpty()) {
+
+                    runReader = runs.remove(0);
+                    runSizeInFrames = runsSizeInFrames.remove(0);
+                    partitionRawRecords = partitionRawRecordsCount.remove(0);
+
+                    // compute the estimated key size in frames for the run file
+                    int runKeySize;
+
+                    if (doInputAdjustment && hashedRawRecords > 0)
+                        runKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInFrames
+                                / observedInputSizeInFrames);
+                    else
+                        runKeySize = (int) Math.ceil((double) userProvidedInputSizeInFrames * partitionRawRecords
+                                / inputSizeInRawRecords);
+
+                    if (topLevelFallbackCheck && runKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
+                        fallBack(runReader, runSizeInFrames, runKeySize, 1);
+                    } else {
+                        processRunFiles(runReader, runKeySize, 1);
+                    }
+                }
+
+                writer.close();
+
+            }
+
+            private void processRunFiles(IFrameReader runReader, int uniqueKeysOfRunFileInFrames, int runLevel)
+                    throws HyracksDataException {
+
+                boolean checkFallback = true;
+
+                int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, uniqueKeysOfRunFileInFrames,
+                        fudgeFactor);
+
+                HybridHashGroupHashTable processor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize,
+                        numOfPartitions, keyFields, runLevel, comparators, tpcf, aggregatorFactory.createAggregator(
+                                ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
+                        recordDescriptors[0], writer);
+
+                processor.open();
+
+                runReader.open();
+
+                int inputRunRawSizeInFrames = 0, inputRunRawSizeInTuples = 0;
+
+                if (readAheadBuf == null) {
+                    readAheadBuf = ctx.allocateFrame();
+                }
+                while (runReader.nextFrame(readAheadBuf)) {
+                    inputRunRawSizeInFrames++;
+                    inputRunRawSizeInTuples += readAheadBuf.getInt(readAheadBuf.capacity() - 4);
+                    processor.nextFrame(readAheadBuf);
+                }
+
+                runReader.close();
+
+                processor.finishup();
+
+                List<IFrameReader> runs = processor.getSpilledRuns();
+                List<Integer> runSizes = processor.getSpilledRunsSizeInPages();
+                List<Integer> partitionRawRecords = processor.getSpilledRunsSizeInTuples();
+
+                int directFlushKeysInTuples = processor.getHashedUniqueKeys();
+                int directFlushRawRecordsInTuples = processor.getHashedRawRecords();
+
+                processor.close();
+
+                int newKeySizeInPages = (doInputAdjustment && directFlushRawRecordsInTuples > 0) ? (int) Math
+                        .ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
+                                * inputRunRawSizeInFrames) : uniqueKeysOfRunFileInFrames;
+
+                IFrameReader recurRunReader;
+                int runSizeInPages, subPartitionRawRecords;
+
+                while (!runs.isEmpty()) {
+                    recurRunReader = runs.remove(0);
+                    runSizeInPages = runSizes.remove(0);
+                    subPartitionRawRecords = partitionRawRecords.remove(0);
+
+                    int newRunKeySize;
+
+                    if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
+                        // do adjustment
+                        newRunKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInPages
+                                / inputRunRawSizeInFrames);
+                    } else {
+                        // no adjustment
+                        newRunKeySize = (int) Math.ceil((double) subPartitionRawRecords * uniqueKeysOfRunFileInFrames
+                                / inputRunRawSizeInTuples);
+                    }
+
+                    if (checkFallback && newRunKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
+                        fallBack(recurRunReader, runSizeInPages, newRunKeySize, runLevel);
+                    } else {
+                        processRunFiles(recurRunReader, newRunKeySize, runLevel + 1);
+                    }
+
+                }
+            }
+
+            private void fallBack(IFrameReader recurRunReader, int runSizeInPages, int runKeySizeInPages, int runLevel)
+                    throws HyracksDataException {
+                fallbackHashSortAlgorithm(recurRunReader, runLevel + 1);
+            }
+
+            private void fallbackHashSortAlgorithm(IFrameReader recurRunReader, int runLevel)
+                    throws HyracksDataException {
+                // fall back
+                FrameTupleAccessor runFrameTupleAccessor = new FrameTupleAccessor(frameSize, inRecDesc);
+                HybridHashSortGroupHashTable hhsTable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
+                        keyFields, comparators, tpcf.createPartitioner(runLevel + 1),
+                        firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(ctx,
+                                inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
+                        recordDescriptors[0]);
+
+                recurRunReader.open();
+                if (readAheadBuf == null) {
+                    readAheadBuf = ctx.allocateFrame();
+                }
+                while (recurRunReader.nextFrame(readAheadBuf)) {
+                    runFrameTupleAccessor.reset(readAheadBuf);
+                    int tupleCount = runFrameTupleAccessor.getTupleCount();
+                    for (int j = 0; j < tupleCount; j++) {
+                        hhsTable.insert(runFrameTupleAccessor, j);
+                    }
+                }
+
+                recurRunReader.close();
+                hhsTable.finishup();
+
+                LinkedList<RunFileReader> hhsRuns = hhsTable.getRunFileReaders();
+
+                if (hhsRuns.isEmpty()) {
+                    hhsTable.flushHashtableToOutput(writer);
+                    hhsTable.close();
+                } else {
+                    hhsTable.close();
+                    HybridHashSortRunMerger hhsMerger = new HybridHashSortRunMerger(ctx, hhsRuns, storedKeyFields,
+                            comparators, recordDescriptors[0], tpcfMerge.createPartitioner(runLevel + 1),
+                            mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
+                                    storedKeyFields, storedKeyFields), framesLimit, tableSize, writer, false);
+                    hhsMerger.process();
+                }
+            }
+
+        };
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
new file mode 100644
index 0000000..666c172
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class HybridHashPartitionGenerateFrameWriter implements IFrameWriter {
+    private final IHyracksTaskContext ctx;
+
+    private RunFileWriter[] partitions;
+
+    private int[] partitionRawSizesInFrames;
+    private int[] partitionRawSizesInTuples;
+
+    private List<IFrameReader> partitionRunReaders;
+    private List<Integer> partitionRunAggregatedPages;
+    private List<Integer> partitionSizeInFrames;
+    private List<Integer> partitionSizeInTuples;
+
+    private ByteBuffer[] outputBuffers;
+
+    private final int numOfPartitions;
+
+    private final ITuplePartitionComputer tpc;
+
+    private final FrameTupleAccessor inFrameTupleAccessor;
+
+    private final FrameTupleAppender outFrameTupleAppender;
+
+    public HybridHashPartitionGenerateFrameWriter(IHyracksTaskContext ctx, int numOfPartitions,
+            ITuplePartitionComputer tpc, RecordDescriptor inRecDesc) {
+        this.ctx = ctx;
+        this.numOfPartitions = numOfPartitions;
+        this.tpc = tpc;
+        this.partitions = new RunFileWriter[numOfPartitions];
+        this.outputBuffers = new ByteBuffer[numOfPartitions];
+        this.partitionRawSizesInTuples = new int[numOfPartitions];
+        this.partitionRawSizesInFrames = new int[numOfPartitions];
+        this.inFrameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+        this.outFrameTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#open()
+     */
+    @Override
+    public void open() throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
+     */
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inFrameTupleAccessor.reset(buffer);
+        int tupleCount = inFrameTupleAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            int pid = tpc.partition(inFrameTupleAccessor, i, numOfPartitions);
+
+            ctx.getCounterContext().getCounter("optional.partition.insert.count", true).update(1);
+
+            if (outputBuffers[pid] == null) {
+                outputBuffers[pid] = ctx.allocateFrame();
+            }
+            outFrameTupleAppender.reset(outputBuffers[pid], false);
+
+            if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
+                // flush the output buffer
+                if (partitions[pid] == null) {
+                    partitions[pid] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+                            HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
+                    partitions[pid].open();
+                }
+                FrameUtils.flushFrame(outputBuffers[pid], partitions[pid]);
+                partitionRawSizesInFrames[pid]++;
+                outFrameTupleAppender.reset(outputBuffers[pid], true);
+                if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
+                    throw new HyracksDataException(
+                            "Failed to insert a record into its partition: the record size is too large. ");
+                }
+            }
+            partitionRawSizesInTuples[pid]++;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#fail()
+     */
+    @Override
+    public void fail() throws HyracksDataException {
+        throw new HyracksDataException("Failed on hash partitioning.");
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#close()
+     */
+    @Override
+    public void close() throws HyracksDataException {
+        outputBuffers = null;
+        for (RunFileWriter partWriter : partitions) {
+            if (partWriter != null)
+                partWriter.close();
+        }
+    }
+
+    public void finishup() throws HyracksDataException {
+        for (int i = 0; i < outputBuffers.length; i++) {
+            if (outputBuffers[i] == null) {
+                continue;
+            }
+            if (partitions[i] == null) {
+                partitions[i] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+                        HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
+                partitions[i].open();
+            }
+            outFrameTupleAppender.reset(outputBuffers[i], false);
+            if (outFrameTupleAppender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(outputBuffers[i], partitions[i]);
+                partitionRawSizesInFrames[i]++;
+                outputBuffers[i] = null;
+            }
+        }
+
+        partitionRunReaders = new LinkedList<IFrameReader>();
+        partitionSizeInFrames = new LinkedList<Integer>();
+        partitionSizeInTuples = new LinkedList<Integer>();
+        partitionRunAggregatedPages = new LinkedList<Integer>();
+
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (partitions[i] != null) {
+                partitionRunReaders.add(partitions[i].createReader());
+                partitionRunAggregatedPages.add(0);
+                partitions[i].close();
+                partitionSizeInFrames.add(partitionRawSizesInFrames[i]);
+                partitionSizeInTuples.add(partitionRawSizesInTuples[i]);
+            }
+        }
+
+    }
+
+    public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
+        return partitionRunReaders;
+    }
+
+    public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
+        return partitionSizeInFrames;
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
new file mode 100644
index 0000000..5323887
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+public class HybridHashUtil {
+
+    /**
+     * Compute the expected number of spilling partitions (in-memory partition is not included), using the hybrid-hash
+     * algorithm from [Shapiro86]. Note that 0 means that there is no need to have spilling partitions.
+     * 
+     * @param inputSizeInFrames
+     * @param memorySizeInFrames
+     * @param fudgeFactor
+     * @return
+     */
+    public static int hybridHashPartitionComputer(int inputSizeOfUniqueKeysInFrames, int memorySizeInFrames,
+            double fudgeFactor) {
+        return Math.max(
+                (int) Math.ceil((inputSizeOfUniqueKeysInFrames * fudgeFactor - memorySizeInFrames)
+                        / (memorySizeInFrames - 1)), 0);
+    }
+
+    /**
+     * Compute the estimated number of unique keys in a partition of a dataset, using Yao's formula
+     * 
+     * @param inputSizeInRawRecords
+     * @param inputSizeInUniqueKeys
+     * @param numOfPartitions
+     * @return
+     */
+    public static long getEstimatedPartitionSizeOfUniqueKeys(long inputSizeInRawRecords, long inputSizeInUniqueKeys,
+            int numOfPartitions) {
+        if (numOfPartitions == 1) {
+            return inputSizeInUniqueKeys;
+        }
+        return (long) Math.ceil(inputSizeInUniqueKeys
+                * (1 - Math.pow(1 - ((double) inputSizeInRawRecords / (double) numOfPartitions)
+                        / (double) inputSizeInRawRecords, (double) inputSizeInRawRecords
+                        / (double) inputSizeInUniqueKeys)));
+    }
+}