Clean up extra files after merge... how were these even still around?  Merge COMPLETE
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
deleted file mode 100644
index d63609e..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class GroupRunMergingFrameReader implements IFrameReader {
-
-    private static final int INT_SIZE = 4;
-
-    private final IHyracksTaskContext ctx;
-    private final IFrameReader[] runCursors;
-    private final List<ByteBuffer> inFrames;
-    private final int[] keyFields;
-    private final int framesLimit;
-    private final int tableSize;
-    private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDesc;
-    private final FrameTupleAppender outFrameAppender;
-    private final ITuplePartitionComputer tpc;
-    private ReferencedPriorityQueue topTuples;
-    private int[] tupleIndexes;
-    private int[] currentFrameIndexForRuns, bufferedFramesForRuns;
-    private FrameTupleAccessor[] tupleAccessors;
-    private int framesBuffered;
-
-    private final IAggregatorDescriptor grouper;
-    private final AggregateState groupState;
-
-    private final boolean isLoadBuffered;
-
-    private final boolean isFinalPhase;
-
-    private final ArrayTupleBuilder groupTupleBuilder, outputTupleBuilder;
-
-    private byte[] groupResultCache;
-    private ByteBuffer groupResultCacheBuffer;
-    private IFrameTupleAccessor groupResultCacheAccessor;
-    private FrameTupleAppender groupResultCacheAppender;
-
-    // FIXME
-    long queueCompCounter = 0, mergeCompCounter = 0;
-
-    public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
-            int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
-            IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase) {
-        this(ctx, runCursors, framesLimit, tableSize, keyFields, tpc, comparators, grouper, recordDesc, isFinalPhase,
-                false);
-    }
-
-    public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
-            int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
-            IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase, boolean isLoadBuffered) {
-        this.ctx = ctx;
-        this.runCursors = runCursors;
-        this.inFrames = new ArrayList<ByteBuffer>();
-        this.keyFields = keyFields;
-        this.tableSize = tableSize;
-        this.comparators = comparators;
-        this.recordDesc = recordDesc;
-        this.grouper = grouper;
-        this.groupState = grouper.createAggregateStates();
-        this.outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-        this.isLoadBuffered = isLoadBuffered;
-        this.isFinalPhase = isFinalPhase;
-        this.framesLimit = framesLimit;
-        this.tpc = tpc;
-
-        this.groupTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
-        this.outputTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameReader#open()
-     */
-    @Override
-    public void open() throws HyracksDataException {
-        if (isLoadBuffered) {
-            while (inFrames.size() + 1 < framesLimit) {
-                inFrames.add(ctx.allocateFrame());
-            }
-            framesBuffered = inFrames.size() / runCursors.length;
-        } else {
-            while (inFrames.size() < framesLimit - 1 && inFrames.size() < runCursors.length) {
-                inFrames.add(ctx.allocateFrame());
-            }
-            framesBuffered = 1;
-        }
-        tupleAccessors = new FrameTupleAccessor[runCursors.length];
-        currentFrameIndexForRuns = new int[runCursors.length];
-        bufferedFramesForRuns = new int[runCursors.length];
-        Comparator<ReferenceEntryWithBucketID> comparator = createEntryComparator(comparators);
-        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
-        tupleIndexes = new int[runCursors.length];
-
-        for (int i = 0; i < runCursors.length; i++) {
-            int runIndex = topTuples.peek().getRunid();
-            tupleIndexes[runIndex] = 0;
-            runCursors[runIndex].open();
-            for (int j = 0; j < framesBuffered; j++) {
-
-                if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
-
-                    bufferedFramesForRuns[runIndex]++;
-                    if (j == 0) {
-                        tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
-                        tupleAccessors[runIndex].reset(inFrames.get(runIndex * framesBuffered + j));
-                        setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-                        currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
-                    }
-                } else {
-                    break;
-                }
-            }
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameReader#nextFrame(java.nio.ByteBuffer)
-     */
-    @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        outFrameAppender.reset(buffer, true);
-
-        while (!topTuples.areRunsExhausted()) {
-            ReferenceEntryWithBucketID top = topTuples.peek();
-            int runIndex = top.getRunid();
-            FrameTupleAccessor fta = top.getAccessor();
-            int tupleIndex = top.getTupleIndex();
-
-            // check whether we can do aggregation
-            boolean needInsert = true;
-            if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
-                groupResultCacheAccessor.reset(ByteBuffer.wrap(groupResultCache));
-                if (compareFrameTuples(fta, tupleIndex, groupResultCacheAccessor, 0) == 0) {
-                    needInsert = false;
-                }
-            }
-
-            if (needInsert) {
-
-                // try to flush the group cache into the output buffer, if any
-                if (groupResultCacheAccessor != null && groupResultCacheAccessor.getFieldCount() > 0) {
-                    outputTupleBuilder.reset();
-                    for (int k = 0; k < keyFields.length; k++) {
-                        outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
-                    }
-                    if (isFinalPhase) {
-                        grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
-                    } else {
-                        grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
-                    }
-
-                    // return if the buffer is full
-                    if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                        return true;
-                    }
-                    groupResultCacheBuffer.putInt(groupResultCache.length - 4, 0);
-                }
-
-                groupTupleBuilder.reset();
-                for (int k : keyFields) {
-                    groupTupleBuilder.addField(fta, tupleIndex, k);
-                }
-                grouper.init(groupTupleBuilder, fta, tupleIndex, groupState);
-
-                // enlarge the cache buffer if necessary
-                int requiredSize = groupTupleBuilder.getSize() + groupTupleBuilder.getFieldEndOffsets().length
-                        * INT_SIZE + 2 * INT_SIZE;
-
-                if (groupResultCache == null || groupResultCache.length < requiredSize) {
-                    groupResultCache = new byte[requiredSize];
-                    groupResultCacheAppender = new FrameTupleAppender(groupResultCache.length);
-                    groupResultCacheBuffer = ByteBuffer.wrap(groupResultCache);
-                    groupResultCacheAccessor = new FrameTupleAccessor(groupResultCache.length, recordDesc);
-                }
-
-                // always reset the group cache
-                groupResultCacheAppender.reset(groupResultCacheBuffer, true);
-                if (!groupResultCacheAppender.append(groupTupleBuilder.getFieldEndOffsets(),
-                        groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {
-                    throw new HyracksDataException("The partial result is too large to be initialized in a frame.");
-                }
-
-                groupResultCacheAccessor.reset(groupResultCacheBuffer);
-
-            } else {
-                grouper.aggregate(fta, tupleIndex, groupResultCacheAccessor, 0, groupState);
-            }
-
-            ++tupleIndexes[runIndex];
-            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-        }
-
-        if (groupResultCacheAccessor != null && groupResultCacheAccessor.getTupleCount() > 0) {
-            outputTupleBuilder.reset();
-            for (int k = 0; k < keyFields.length; k++) {
-                outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
-            }
-            if (isFinalPhase) {
-                grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
-            } else {
-                grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
-            }
-
-            // return if the buffer is full
-            if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
-                    outputTupleBuilder.getSize())) {
-                return true;
-            }
-
-            groupResultCacheAccessor = null;
-            groupResultCache = null;
-            groupResultCacheBuffer = null;
-            groupResultCacheAppender = null;
-        }
-
-        if (outFrameAppender.getTupleCount() > 0) {
-            return true;
-        }
-
-        return false;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameReader#close()
-     */
-    @Override
-    public void close() throws HyracksDataException {
-        for (int i = 0; i < runCursors.length; ++i) {
-            closeRun(i, runCursors, tupleAccessors);
-        }
-    }
-
-    private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
-        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-        if (exists) {
-            int h = tpc.partition(tupleAccessors[runIndex], tupleIndexes[runIndex], tableSize);
-            topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex], h);
-        } else {
-            topTuples.pop();
-            closeRun(runIndex, runCursors, tupleAccessors);
-        }
-    }
-
-    private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
-        if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
-            return false;
-        } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
-            if (currentFrameIndexForRuns[runIndex] - runIndex * framesBuffered < bufferedFramesForRuns[runIndex] - 1) {
-                currentFrameIndexForRuns[runIndex]++;
-            } else {
-                bufferedFramesForRuns[runIndex] = 0;
-                for (int j = 0; j < framesBuffered; j++) {
-                    if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
-                        bufferedFramesForRuns[runIndex]++;
-                    } else {
-                        break;
-                    }
-                }
-                currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
-            }
-            if (bufferedFramesForRuns[runIndex] > 0) {
-                tupleAccessors[runIndex].reset(inFrames.get(currentFrameIndexForRuns[runIndex]));
-                tupleIndexes[runIndex] = 0;
-                return true;
-            } else {
-                return false;
-            }
-        } else {
-            return true;
-        }
-    }
-
-    private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
-            throws HyracksDataException {
-        if (runCursors[index] != null) {
-            runCursors[index].close();
-            runCursors[index] = null;
-            tupleAccessors[index] = null;
-        }
-    }
-
-    private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
-        mergeCompCounter++;
-        byte[] b1 = fta1.getBuffer().array();
-        byte[] b2 = fta2.getBuffer().array();
-        for (int f = 0; f < keyFields.length; ++f) {
-            int fIdx = f;
-            int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
-            int l1 = fta1.getFieldLength(j1, fIdx);
-            int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
-            int l2_start = fta2.getFieldStartOffset(j2, fIdx);
-            int l2_end = fta2.getFieldEndOffset(j2, fIdx);
-            int l2 = l2_end - l2_start;
-            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    private Comparator<ReferenceEntryWithBucketID> createEntryComparator(final IBinaryComparator[] comparators) {
-        return new Comparator<ReferenceEntryWithBucketID>() {
-            public int compare(ReferenceEntryWithBucketID tp1, ReferenceEntryWithBucketID tp2) {
-
-                queueCompCounter++;
-
-                int cmp = tp1.getBucketID() - tp2.getBucketID();
-
-                if (cmp != 0) {
-                    return cmp;
-                }
-
-                FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
-                FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
-                int j1 = tp1.getTupleIndex();
-                int j2 = tp2.getTupleIndex();
-                byte[] b1 = fta1.getBuffer().array();
-                byte[] b2 = fta2.getBuffer().array();
-                for (int f = 0; f < keyFields.length; ++f) {
-                    int fIdx = keyFields[f];
-                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                            + fta1.getFieldStartOffset(j1, fIdx);
-                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                            + fta2.getFieldStartOffset(j2, fIdx);
-                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                    if (c != 0) {
-                        return c;
-                    }
-                }
-
-                return cmp;
-            }
-        };
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
deleted file mode 100644
index 6e85cff..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
+++ /dev/null
@@ -1,686 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAccessorForGroupHashtable;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAppenderForGroupHashtable;
-import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-
-public class HybridHashSortGroupHashTable {
-
-    protected static final int INT_SIZE = 4;
-    protected static final int INIT_REF_COUNT = 8;
-    protected static final int PTR_SIZE = 3;
-
-    protected final int tableSize, framesLimit, frameSize;
-
-    protected final ByteBuffer[] headers;
-    protected final ByteBuffer[] contents;
-
-    protected final IHyracksTaskContext ctx;
-
-    protected int currentLargestFrameIndex;
-    protected int totalTupleCount;
-
-    protected final IAggregatorDescriptor aggregator;
-    protected final AggregateState aggState;
-
-    protected final int[] keys, internalKeys;
-
-    private final IBinaryComparator[] comparators;
-
-    protected final ITuplePartitionComputer tpc;
-
-    protected final INormalizedKeyComputer firstNormalizer;
-
-    private ByteBuffer outputBuffer;
-
-    private LinkedList<RunFileReader> runReaders;
-
-    protected TuplePointer matchPointer;
-
-    protected final FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
-
-    private final FrameTupleAccessorForGroupHashtable compFrameAccessor1, compFrameAccessor2;
-
-    protected final FrameTupleAppenderForGroupHashtable internalAppender;
-
-    private final FrameTupleAppender outputAppender;
-
-    /**
-     * Tuple builder for hash table insertion
-     */
-    protected final ArrayTupleBuilder internalTupleBuilder, outputTupleBuilder;
-
-    /**
-     * pointers for sort records in an entry
-     */
-    protected int[] tPointers;
-
-    protected int usedEntries = 0;
-
-    protected long hashedKeys = 0, hashedRawRec = 0;
-
-    public HybridHashSortGroupHashTable(IHyracksTaskContext ctx, int frameLimits, int tableSize, int[] keys,
-            IBinaryComparator[] comparators, ITuplePartitionComputer tpc,
-            INormalizedKeyComputer firstNormalizerComputer, IAggregatorDescriptor aggregator,
-            RecordDescriptor inRecDesc, RecordDescriptor outRecDesc) {
-        this.ctx = ctx;
-        this.tableSize = tableSize;
-        this.framesLimit = frameLimits;
-        this.frameSize = ctx.getFrameSize();
-
-        this.keys = keys;
-        this.internalKeys = new int[keys.length];
-        for (int i = 0; i < internalKeys.length; i++) {
-            internalKeys[i] = i;
-        }
-
-        this.aggregator = aggregator;
-        this.aggState = aggregator.createAggregateStates();
-
-        this.tpc = tpc;
-        this.comparators = comparators;
-        this.firstNormalizer = firstNormalizerComputer;
-
-        // initialize the hash table
-        int residual = ((tableSize % frameSize) * INT_SIZE * 2) % frameSize == 0 ? 0 : 1;
-        this.headers = new ByteBuffer[tableSize / frameSize * INT_SIZE * 2 + tableSize % frameSize * 2 * INT_SIZE
-                / frameSize + residual];
-
-        this.outputBuffer = ctx.allocateFrame();
-
-        this.contents = new ByteBuffer[framesLimit - 1 - headers.length];
-        this.currentLargestFrameIndex = -1;
-        this.totalTupleCount = 0;
-
-        this.runReaders = new LinkedList<RunFileReader>();
-        this.hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
-        this.compFrameAccessor1 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
-        this.compFrameAccessor2 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
-
-        this.internalTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
-        this.outputTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
-        this.internalAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
-        this.outputAppender = new FrameTupleAppender(frameSize);
-
-        this.matchPointer = new TuplePointer();
-
-    }
-
-    /**
-     * Reset the header page
-     * 
-     * @param headerFrameIndex
-     */
-    protected void resetHeader(int headerFrameIndex) {
-        for (int i = 0; i < frameSize; i += INT_SIZE) {
-            headers[headerFrameIndex].putInt(i, -1);
-        }
-    }
-
-    /**
-     * Get the header frame index of the given hash table entry
-     * 
-     * @param entry
-     * @return
-     */
-    protected int getHeaderFrameIndex(int entry) {
-        int frameIndex = entry / frameSize * 2 * INT_SIZE + entry % frameSize * 2 * INT_SIZE / frameSize;
-        return frameIndex;
-    }
-
-    /**
-     * Get the tuple index of the given hash table entry
-     * 
-     * @param entry
-     * @return
-     */
-    protected int getHeaderTupleIndex(int entry) {
-        int offset = entry % frameSize * 2 * INT_SIZE % frameSize;
-        return offset;
-    }
-
-    public void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
-
-        int entry = tpc.partition(accessor, tupleIndex, tableSize);
-
-        hashedRawRec++;
-
-        if (findMatch(entry, accessor, tupleIndex)) {
-            // find match; do aggregation
-            hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
-            aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex, aggState);
-        } else {
-
-            internalTupleBuilder.reset();
-            for (int k = 0; k < keys.length; k++) {
-                internalTupleBuilder.addField(accessor, tupleIndex, keys[k]);
-            }
-            aggregator.init(internalTupleBuilder, accessor, tupleIndex, aggState);
-            int insertFrameIndex = -1, insertTupleIndex = -1;
-            boolean inserted = false;
-
-            if (currentLargestFrameIndex < 0) {
-                currentLargestFrameIndex = 0;
-            }
-
-            if (contents[currentLargestFrameIndex] == null) {
-                contents[currentLargestFrameIndex] = ctx.allocateFrame();
-            }
-
-            internalAppender.reset(contents[currentLargestFrameIndex], false);
-            if (internalAppender.append(internalTupleBuilder.getFieldEndOffsets(), internalTupleBuilder.getByteArray(),
-                    0, internalTupleBuilder.getSize())) {
-                inserted = true;
-                insertFrameIndex = currentLargestFrameIndex;
-                insertTupleIndex = internalAppender.getTupleCount() - 1;
-            }
-
-            if (!inserted && currentLargestFrameIndex < contents.length - 1) {
-                currentLargestFrameIndex++;
-                if (contents[currentLargestFrameIndex] == null) {
-                    contents[currentLargestFrameIndex] = ctx.allocateFrame();
-                }
-                internalAppender.reset(contents[currentLargestFrameIndex], true);
-                if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
-                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
-                    throw new HyracksDataException("Failed to insert an aggregation value.");
-                } else {
-                    insertFrameIndex = currentLargestFrameIndex;
-                    insertTupleIndex = internalAppender.getTupleCount() - 1;
-                    inserted = true;
-                }
-            }
-
-            // memory is full
-            if (!inserted) {
-                // flush hash table and try to insert again
-                flush();
-
-                // update the match point to the header reference
-                matchPointer.frameIndex = -1;
-                matchPointer.tupleIndex = -1;
-                // re-insert
-                currentLargestFrameIndex++;
-                if (contents[currentLargestFrameIndex] == null) {
-                    contents[currentLargestFrameIndex] = ctx.allocateFrame();
-                }
-                internalAppender.reset(contents[currentLargestFrameIndex], true);
-                if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
-                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
-                    throw new HyracksDataException("Failed to insert an aggregation value.");
-                } else {
-                    insertFrameIndex = currentLargestFrameIndex;
-                    insertTupleIndex = internalAppender.getTupleCount() - 1;
-                }
-            }
-
-            // no match; new insertion
-            if (matchPointer.frameIndex < 0) {
-                // first record for this entry; update the header references
-                int headerFrameIndex = getHeaderFrameIndex(entry);
-                int headerFrameOffset = getHeaderTupleIndex(entry);
-                if (headers[headerFrameIndex] == null) {
-                    headers[headerFrameIndex] = ctx.allocateFrame();
-                    resetHeader(headerFrameIndex);
-                }
-                headers[headerFrameIndex].putInt(headerFrameOffset, insertFrameIndex);
-                headers[headerFrameIndex].putInt(headerFrameOffset + INT_SIZE, insertTupleIndex);
-                usedEntries++;
-
-            } else {
-                // update the previous reference
-                hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
-                int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
-                contents[matchPointer.frameIndex].putInt(refOffset, insertFrameIndex);
-                contents[matchPointer.frameIndex].putInt(refOffset + INT_SIZE, insertTupleIndex);
-            }
-            hashedKeys++;
-            totalTupleCount++;
-        }
-    }
-
-    /**
-     * Flush the hash table directly to the output
-     */
-    public void flushHashtableToOutput(IFrameWriter outputWriter) throws HyracksDataException {
-
-        outputAppender.reset(outputBuffer, true);
-        for (int i = 0; i < contents.length; i++) {
-            if (contents[i] == null) {
-                continue;
-            }
-            hashtableRecordAccessor.reset(contents[i]);
-            int tupleCount = hashtableRecordAccessor.getTupleCount();
-            for (int j = 0; j < tupleCount; j++) {
-                outputTupleBuilder.reset();
-
-                int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(j);
-                int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
-
-                for (int k = 0; k < internalKeys.length; k++) {
-                    outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
-                            + hashtableRecordAccessor.getFieldStartOffset(j, k),
-                            hashtableRecordAccessor.getFieldLength(j, k));
-                }
-
-                aggregator.outputFinalResult(outputTupleBuilder, hashtableRecordAccessor, j, aggState);
-
-                if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
-                        0, outputTupleBuilder.getSize())) {
-
-                    FrameUtils.flushFrame(outputBuffer, outputWriter);
-
-                    outputAppender.reset(outputBuffer, true);
-                    if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                        throw new HyracksDataException("Failed to flush the hash table to the final output");
-                    }
-                }
-            }
-        }
-
-        if (outputAppender.getTupleCount() > 0) {
-
-            FrameUtils.flushFrame(outputBuffer, outputWriter);
-
-            outputAppender.reset(outputBuffer, true);
-        }
-
-        totalTupleCount = 0;
-        usedEntries = 0;
-    }
-
-    /**
-     * Flush hash table into a run file.
-     * 
-     * @throws HyracksDataException
-     */
-    protected void flush() throws HyracksDataException {
-
-        long methodTimer = System.nanoTime();
-
-        FileReference runFile;
-        try {
-            runFile = ctx.getJobletContext().createManagedWorkspaceFile(
-                    HybridHashSortGroupHashTable.class.getSimpleName());
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-        RunFileWriter runWriter = new RunFileWriter(runFile, ctx.getIOManager());
-        runWriter.open();
-        flushEntries(runWriter);
-        runWriter.close();
-        runReaders.add(runWriter.createReader());
-        reset();
-
-        ctx.getCounterContext()
-                .getCounter("optional." + HybridHashSortGroupHashTable.class.getSimpleName() + ".flush.time", true)
-                .update(System.nanoTime() - methodTimer);
-    }
-
-    private void flushEntries(IFrameWriter writer) throws HyracksDataException {
-
-        outputAppender.reset(outputBuffer, true);
-        for (int i = 0; i < tableSize; i++) {
-            int tupleInEntry = sortEntry(i);
-
-            for (int ptr = 0; ptr < tupleInEntry; ptr++) {
-                int frameIndex = tPointers[ptr * PTR_SIZE];
-                int tupleIndex = tPointers[ptr * PTR_SIZE + 1];
-
-                hashtableRecordAccessor.reset(contents[frameIndex]);
-                outputTupleBuilder.reset();
-
-                int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(tupleIndex);
-                int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
-
-                for (int k = 0; k < internalKeys.length; k++) {
-                    outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
-                            + hashtableRecordAccessor.getFieldStartOffset(tupleIndex, k),
-                            hashtableRecordAccessor.getFieldLength(tupleIndex, k));
-                }
-
-                aggregator.outputPartialResult(outputTupleBuilder, hashtableRecordAccessor, tupleIndex, aggState);
-
-                if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
-                        0, outputTupleBuilder.getSize())) {
-
-                    FrameUtils.flushFrame(outputBuffer, writer);
-
-                    outputAppender.reset(outputBuffer, true);
-                    if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                        throw new HyracksDataException("Failed to flush an aggregation result.");
-                    }
-                }
-                totalTupleCount--;
-            }
-
-            if (tupleInEntry > 0) {
-                usedEntries--;
-            }
-        }
-
-        if (outputAppender.getTupleCount() > 0) {
-
-            FrameUtils.flushFrame(outputBuffer, writer);
-
-            outputAppender.reset(outputBuffer, true);
-        }
-    }
-
-    protected int sortEntry(int entryID) {
-
-        if (tPointers == null)
-            tPointers = new int[INIT_REF_COUNT * PTR_SIZE];
-        int ptr = 0;
-
-        int headerFrameIndex = entryID / frameSize * 2 * INT_SIZE + (entryID % frameSize) * 2 * INT_SIZE / frameSize;
-        int headerFrameOffset = (entryID % frameSize) * 2 * INT_SIZE % frameSize;
-
-        if (headers[headerFrameIndex] == null) {
-            return 0;
-        }
-
-        int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
-        int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
-
-        do {
-            if (entryFrameIndex < 0) {
-                break;
-            }
-            hashtableRecordAccessor.reset(contents[entryFrameIndex]);
-            tPointers[ptr * PTR_SIZE] = entryFrameIndex;
-            tPointers[ptr * PTR_SIZE + 1] = entryTupleIndex;
-            int tStart = hashtableRecordAccessor.getTupleStartOffset(entryTupleIndex);
-            int f0StartRel = hashtableRecordAccessor.getFieldStartOffset(entryTupleIndex, internalKeys[0]);
-            int f0EndRel = hashtableRecordAccessor.getFieldEndOffset(entryTupleIndex, internalKeys[0]);
-            int f0Start = f0StartRel + tStart + hashtableRecordAccessor.getFieldSlotsLength();
-            tPointers[ptr * PTR_SIZE + 2] = firstNormalizer == null ? 0 : firstNormalizer.normalize(
-                    hashtableRecordAccessor.getBuffer().array(), f0Start, f0EndRel - f0StartRel);
-
-            ptr++;
-
-            if (ptr * PTR_SIZE >= tPointers.length) {
-                int[] newTPointers = new int[tPointers.length * 2];
-                System.arraycopy(tPointers, 0, newTPointers, 0, tPointers.length);
-                tPointers = newTPointers;
-            }
-
-            // move to the next record
-            int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
-            int prevFrameIndex = entryFrameIndex;
-            entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
-            entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
-
-        } while (true);
-
-        // sort records
-        if (ptr > 1) {
-            sort(0, ptr);
-        }
-
-        return ptr;
-    }
-
-    protected void sort(int offset, int len) {
-        int m = offset + (len >> 1);
-        int mFrameIndex = tPointers[m * PTR_SIZE];
-        int mTupleIndex = tPointers[m * PTR_SIZE + 1];
-        int mNormKey = tPointers[m * PTR_SIZE + 2];
-        compFrameAccessor1.reset(contents[mFrameIndex]);
-
-        int a = offset;
-        int b = a;
-        int c = offset + len - 1;
-        int d = c;
-        while (true) {
-            while (b <= c) {
-                int bFrameIndex = tPointers[b * PTR_SIZE];
-                int bTupleIndex = tPointers[b * PTR_SIZE + 1];
-                int bNormKey = tPointers[b * PTR_SIZE + 2];
-                int cmp = 0;
-                if (bNormKey != mNormKey) {
-                    cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
-                } else {
-                    compFrameAccessor2.reset(contents[bFrameIndex]);
-                    cmp = compare(compFrameAccessor2, bTupleIndex, compFrameAccessor1, mTupleIndex);
-                }
-                if (cmp > 0) {
-                    break;
-                }
-                if (cmp == 0) {
-                    swap(a++, b);
-                }
-                ++b;
-            }
-            while (c >= b) {
-                int cFrameIndex = tPointers[c * PTR_SIZE];
-                int cTupleIndex = tPointers[c * PTR_SIZE + 1];
-                int cNormKey = tPointers[c * PTR_SIZE + 2];
-                int cmp = 0;
-                if (cNormKey != mNormKey) {
-                    cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
-                } else {
-                    compFrameAccessor2.reset(contents[cFrameIndex]);
-                    cmp = compare(compFrameAccessor2, cTupleIndex, compFrameAccessor1, mTupleIndex);
-                }
-                if (cmp < 0) {
-                    break;
-                }
-                if (cmp == 0) {
-                    swap(c, d--);
-                }
-                --c;
-            }
-            if (b > c)
-                break;
-            swap(b++, c--);
-        }
-
-        int s;
-        int n = offset + len;
-        s = Math.min(a - offset, b - a);
-        vecswap(offset, b - s, s);
-        s = Math.min(d - c, n - d - 1);
-        vecswap(b, n - s, s);
-
-        if ((s = b - a) > 1) {
-            sort(offset, s);
-        }
-        if ((s = d - c) > 1) {
-            sort(n - s, s);
-        }
-    }
-
-    private void swap(int a, int b) {
-        for (int i = 0; i < PTR_SIZE; i++) {
-            int t = tPointers[a * PTR_SIZE + i];
-            tPointers[a * PTR_SIZE + i] = tPointers[b * PTR_SIZE + i];
-            tPointers[b * PTR_SIZE + i] = t;
-        }
-    }
-
-    private void vecswap(int a, int b, int n) {
-        for (int i = 0; i < n; i++, a++, b++) {
-            swap(a, b);
-        }
-    }
-
-    protected boolean findMatch(int entry, FrameTupleAccessor accessor, int tupleIndex) {
-
-        // reset the match pointer
-        matchPointer.frameIndex = -1;
-        matchPointer.tupleIndex = -1;
-
-        // get reference in the header
-        int headerFrameIndex = getHeaderFrameIndex(entry);
-        int headerFrameOffset = getHeaderTupleIndex(entry);
-
-        if (headers[headerFrameIndex] == null) {
-            return false;
-        }
-
-        // initialize the pointer to the first record 
-        int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
-        int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
-
-        while (entryFrameIndex >= 0) {
-            matchPointer.frameIndex = entryFrameIndex;
-            matchPointer.tupleIndex = entryTupleIndex;
-            hashtableRecordAccessor.reset(contents[entryFrameIndex]);
-
-            if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
-                return true;
-            }
-            // Move to the next record in this entry following the linked list
-            int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
-            int prevFrameIndex = entryFrameIndex;
-            entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
-            entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
-        }
-
-        return false;
-    }
-
-    public LinkedList<RunFileReader> getRunFileReaders() {
-        return runReaders;
-    }
-
-    private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
-            int hashTupleIndex) {
-        int tStart0 = accessor.getTupleStartOffset(tupleIndex);
-        int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
-
-        int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
-        int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
-
-        for (int i = 0; i < keys.length; ++i) {
-            int fStart0 = accessor.getFieldStartOffset(tupleIndex, keys[i]);
-            int fEnd0 = accessor.getFieldEndOffset(tupleIndex, keys[i]);
-            int fLen0 = fEnd0 - fStart0;
-
-            int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
-            int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
-            int fLen1 = fEnd1 - fStart1;
-
-            int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    private int compare(FrameTupleAccessorForGroupHashtable accessor1, int tupleIndex1,
-            FrameTupleAccessorForGroupHashtable accessor2, int tupleIndex2) {
-        int tStart1 = accessor1.getTupleStartOffset(tupleIndex1);
-        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
-        int tStart2 = accessor2.getTupleStartOffset(tupleIndex2);
-        int fStartOffset2 = accessor2.getFieldSlotsLength() + tStart2;
-
-        for (int i = 0; i < internalKeys.length; ++i) {
-            int fStart1 = accessor1.getFieldStartOffset(tupleIndex1, internalKeys[i]);
-            int fEnd1 = accessor1.getFieldEndOffset(tupleIndex1, internalKeys[i]);
-            int fLen1 = fEnd1 - fStart1;
-
-            int fStart2 = accessor2.getFieldStartOffset(tupleIndex2, internalKeys[i]);
-            int fEnd2 = accessor2.getFieldEndOffset(tupleIndex2, internalKeys[i]);
-            int fLen2 = fEnd2 - fStart2;
-
-            int c = comparators[i].compare(accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1, accessor2
-                    .getBuffer().array(), fStart2 + fStartOffset2, fLen2);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    public void reset() {
-        for (int i = 0; i < headers.length; i++) {
-            if (headers[i] != null) {
-                resetHeader(i);
-            }
-        }
-        for (int i = 0; i < contents.length; i++) {
-            if (contents[i] != null) {
-                contents[i].putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
-            }
-        }
-
-        usedEntries = 0;
-        totalTupleCount = 0;
-        currentLargestFrameIndex = -1;
-    }
-
-    public void finishup() throws HyracksDataException {
-        if (runReaders.size() > 0) {
-            flush();
-        }
-
-        hashedKeys = 0;
-        hashedRawRec = 0;
-    }
-
-    /**
-     * Close the hash table. Note that only memory allocated by frames are freed. Aggregation
-     * states maintained in {@link #aggState} and run file readers in {@link #runReaders} should
-     * be valid for later processing.
-     */
-    public void close() throws HyracksDataException {
-        for (int i = 0; i < headers.length; i++) {
-            headers[i] = null;
-        }
-        for (int i = 0; i < contents.length; i++) {
-            contents[i] = null;
-        }
-        outputBuffer = null;
-        tPointers = null;
-    }
-
-    public int getTupleCount() {
-        return totalTupleCount;
-    }
-
-    public int getFrameSize() {
-        return headers.length + contents.length + 1;
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
deleted file mode 100644
index 5296c9f..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class HybridHashSortGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
-    private static final int AGGREGATE_ACTIVITY_ID = 0;
-
-    private static final int MERGE_ACTIVITY_ID = 1;
-
-    private static final long serialVersionUID = 1L;
-    private final int[] keyFields, storedKeyFields;
-    private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-    private final IAggregatorDescriptorFactory mergerFactory;
-
-    private final ITuplePartitionComputerFactory aggTpcf, mergeTpcf;
-
-    private final int framesLimit;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-
-    private final int tableSize;
-
-    private final boolean isLoadOptimized;
-
-    public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
-            int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
-            ITuplePartitionComputerFactory mergeTpcf, IAggregatorDescriptorFactory aggregatorFactory,
-            IAggregatorDescriptorFactory mergerFactory, RecordDescriptor recordDescriptor) {
-        this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, null, aggregatorFactory,
-                mergerFactory, recordDescriptor, false);
-    }
-
-    public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
-            int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
-            ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
-            RecordDescriptor recordDescriptor) {
-        this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, firstNormalizerFactory,
-                aggregatorFactory, mergerFactory, recordDescriptor, false);
-    }
-
-    public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
-            int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
-            ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
-            RecordDescriptor recordDescriptor, boolean isLoadOpt) {
-        super(spec, 1, 1);
-        this.framesLimit = framesLimit;
-        if (framesLimit <= 2) {
-            /**
-             * Minimum of 3 frames: 2 for in-memory hash table, and 1 for output
-             * aggregation results.
-             */
-            throw new IllegalStateException("frame limit should at least be 3, but it is " + framesLimit + "!");
-        }
-
-        storedKeyFields = new int[keyFields.length];
-        for (int i = 0; i < storedKeyFields.length; i++) {
-            storedKeyFields[i] = i;
-        }
-        this.aggregatorFactory = aggregatorFactory;
-        this.mergerFactory = mergerFactory;
-        this.keyFields = keyFields;
-        this.comparatorFactories = comparatorFactories;
-        this.firstNormalizerFactory = firstNormalizerFactory;
-        this.aggTpcf = aggTpcf;
-        this.mergeTpcf = mergeTpcf;
-        this.tableSize = tableSize;
-
-        /**
-         * Set the record descriptor. Note that since this operator is a unary
-         * operator, only the first record descriptor is used here.
-         */
-        recordDescriptors[0] = recordDescriptor;
-
-        this.isLoadOptimized = isLoadOpt;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities(edu.uci.ics.hyracks.api.dataflow.
-     * IActivityGraphBuilder)
-     */
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
-        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
-        builder.addActivity(this, aggregateAct);
-        builder.addSourceEdge(0, aggregateAct, 0);
-
-        builder.addActivity(this, mergeAct);
-        builder.addTargetEdge(0, mergeAct, 0);
-
-        builder.addBlockingEdge(aggregateAct, mergeAct);
-    }
-
-    public static class AggregateActivityState extends AbstractStateObject {
-
-        private HybridHashSortGroupHashTable gTable;
-
-        public AggregateActivityState() {
-        }
-
-        private AggregateActivityState(JobId jobId, TaskId tId) {
-            super(jobId, tId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private class AggregateActivity extends AbstractActivityNode {
-
-        private static final long serialVersionUID = 1L;
-
-        public AggregateActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                throws HyracksDataException {
-            return new AbstractUnaryInputSinkOperatorNodePushable() {
-
-                HybridHashSortGroupHashTable serializableGroupHashtable;
-
-                FrameTupleAccessor accessor;
-
-                @Override
-                public void open() throws HyracksDataException {
-
-                    RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-
-                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-                    for (int i = 0; i < comparatorFactories.length; i++) {
-                        comparators[i] = comparatorFactories[i].createBinaryComparator();
-                    }
-
-                    serializableGroupHashtable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
-                            keyFields, comparators, aggTpcf.createPartitioner(),
-                            firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(
-                                    ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
-                            recordDescriptors[0]);
-                    accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    accessor.reset(buffer);
-                    int tupleCount = accessor.getTupleCount();
-                    for (int i = 0; i < tupleCount; i++) {
-                        serializableGroupHashtable.insert(accessor, i);
-                    }
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    serializableGroupHashtable.finishup();
-                    AggregateActivityState state = new AggregateActivityState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition));
-                    state.gTable = serializableGroupHashtable;
-                    ctx.setStateObject(state);
-                }
-            };
-        }
-    }
-
-    private class MergeActivity extends AbstractActivityNode {
-
-        private static final long serialVersionUID = 1L;
-
-        public MergeActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
-
-            return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
-                public void initialize() throws HyracksDataException {
-
-                    AggregateActivityState aggState = (AggregateActivityState) ctx.getStateObject(new TaskId(
-                            new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID), partition));
-
-                    LinkedList<RunFileReader> runs = aggState.gTable.getRunFileReaders();
-
-                    writer.open();
-                    if (runs.size() <= 0) {
-                        aggState.gTable.flushHashtableToOutput(writer);
-                        aggState.gTable.close();
-                    } else {
-                        aggState.gTable.close();
-
-                        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-                        for (int i = 0; i < comparatorFactories.length; i++) {
-                            comparators[i] = comparatorFactories[i].createBinaryComparator();
-                        }
-
-                        HybridHashSortRunMerger merger = new HybridHashSortRunMerger(ctx, runs, storedKeyFields,
-                                comparators, recordDescriptors[0], mergeTpcf.createPartitioner(),
-                                mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
-                                        storedKeyFields, storedKeyFields), framesLimit, tableSize, writer,
-                                isLoadOptimized);
-
-                        merger.process();
-                    }
-
-                    writer.close();
-                }
-
-            };
-        }
-    }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
deleted file mode 100644
index 1de2237..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class HybridHashSortGrouperBucketMerge {
-
-    private final int[] keyFields;
-    private final IBinaryComparator[] comparators;
-
-    private final IAggregatorDescriptor merger;
-    private final AggregateState mergeState;
-
-    private final int framesLimit, tableSize;
-
-    private final RecordDescriptor inRecDesc;
-
-    private final IHyracksTaskContext ctx;
-
-    private final ArrayTupleBuilder tupleBuilder;
-
-    private final IFrameWriter outputWriter;
-
-    private final ITuplePartitionComputer tpc;
-
-    private final boolean isLoadOptimized;
-
-    List<ByteBuffer> inFrames;
-    ByteBuffer outFrame, writerFrame;
-    FrameTupleAppender outAppender, writerAppender;
-    LinkedList<RunFileReader> runs;
-    ArrayTupleBuilder finalTupleBuilder;
-    FrameTupleAccessor outFrameAccessor;
-    int[] currentFrameIndexInRun, currentRunFrames, currentBucketInRun;
-    int runFrameLimit = 1;
-
-    public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
-            ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
-            RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter)
-            throws HyracksDataException {
-        this.ctx = ctx;
-        this.framesLimit = framesLimit;
-        this.tableSize = tableSize;
-
-        this.keyFields = keyFields;
-        this.comparators = comparators;
-        this.merger = merger;
-        this.mergeState = merger.createAggregateStates();
-
-        this.inRecDesc = inRecDesc;
-
-        this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
-
-        this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
-
-        this.outputWriter = outputWriter;
-
-        this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
-
-        this.tpc = tpc;
-
-        this.isLoadOptimized = true;
-    }
-
-    public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
-            ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
-            RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter, boolean loadOptimized)
-            throws HyracksDataException {
-        this.ctx = ctx;
-        this.framesLimit = framesLimit;
-        this.tableSize = tableSize;
-
-        this.keyFields = keyFields;
-        this.comparators = comparators;
-        this.merger = merger;
-        this.mergeState = merger.createAggregateStates();
-
-        this.inRecDesc = inRecDesc;
-
-        this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
-
-        this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
-
-        this.outputWriter = outputWriter;
-
-        this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
-
-        this.tpc = tpc;
-
-        this.isLoadOptimized = loadOptimized;
-    }
-
-    public void initialize(LinkedList<RunFileReader> runFiles) throws HyracksDataException {
-
-        runs = runFiles;
-
-        try {
-            if (runs.size() <= 0) {
-                return;
-            } else {
-                inFrames = new ArrayList<ByteBuffer>();
-                outFrame = ctx.allocateFrame();
-                outAppender.reset(outFrame, true);
-                outFrameAccessor.reset(outFrame);
-                int runProcOffset = 0;
-                while (runs.size() > 0) {
-                    try {
-                        doPass(runs, runProcOffset);
-                        if (runs.size() + 2 <= framesLimit) {
-                            // final phase
-                            runProcOffset = 0;
-                        } else {
-                            // one more merge level
-                            runProcOffset++;
-                        }
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-                inFrames.clear();
-            }
-        } catch (Exception e) {
-            outputWriter.fail();
-            throw new HyracksDataException(e);
-        } finally {
-            mergeState.close();
-        }
-    }
-
-    private void doPass(LinkedList<RunFileReader> runs, int offset) throws HyracksDataException {
-        FileReference newRun = null;
-        IFrameWriter writer = outputWriter;
-        boolean finalPass = false;
-
-        int runNumber = runs.size() - offset;
-
-        while (inFrames.size() + 2 < framesLimit) {
-            inFrames.add(ctx.allocateFrame());
-        }
-
-        if (runNumber + 2 <= framesLimit) {
-            finalPass = true;
-            if (isLoadOptimized)
-                runFrameLimit = (framesLimit - 2) / runNumber;
-            else
-                runFrameLimit = 1;
-        } else {
-            runFrameLimit = 1;
-            runNumber = framesLimit - 2;
-            newRun = ctx.getJobletContext().createManagedWorkspaceFile(
-                    HybridHashSortGrouperBucketMerge.class.getSimpleName());
-            writer = new RunFileWriter(newRun, ctx.getIOManager());
-            writer.open();
-        }
-        try {
-            currentFrameIndexInRun = new int[runNumber];
-            currentRunFrames = new int[runNumber];
-            currentBucketInRun = new int[runNumber];
-            /**
-             * Create file readers for each input run file, only for
-             * the ones fit into the inFrames
-             */
-            RunFileReader[] runFileReaders = new RunFileReader[runNumber];
-            FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-            Comparator<ReferenceHashEntry> comparator = createEntryComparator(comparators);
-            ReferencedBucketBasedPriorityQueue topTuples = new ReferencedBucketBasedPriorityQueue(ctx.getFrameSize(),
-                    inRecDesc, runNumber, comparator, tpc, tableSize);
-            /**
-             * current tuple index in each run
-             */
-            int[] tupleIndices = new int[runNumber];
-
-            for (int i = 0; i < runNumber; i++) {
-                int runIndex = i + offset;
-                tupleIndices[i] = 0;
-                // Load the run file
-                runFileReaders[i] = runs.get(runIndex);
-                runFileReaders[i].open();
-
-                currentRunFrames[i] = 0;
-                currentFrameIndexInRun[i] = i * runFrameLimit;
-                for (int j = 0; j < runFrameLimit; j++) {
-                    int frameIndex = currentFrameIndexInRun[i] + j;
-                    boolean hasNextFrame = runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex));
-                    if (hasNextFrame) {
-                        tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
-                        tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
-                        currentRunFrames[i]++;
-                        if (j == 0) {
-                            currentBucketInRun[i] = tpc.partition(tupleAccessors[frameIndex], tupleIndices[i],
-                                    tableSize);
-                            setNextTopTuple(i, tupleIndices, runFileReaders, tupleAccessors, topTuples);
-                        }
-                    } else {
-                        break;
-                    }
-                }
-            }
-
-            /**
-             * Start merging
-             */
-            while (!topTuples.areRunsExhausted()) {
-                /**
-                 * Get the top record
-                 */
-                ReferenceEntry top = topTuples.peek();
-                int tupleIndex = top.getTupleIndex();
-                int runIndex = topTuples.peek().getRunid();
-
-                FrameTupleAccessor fta = top.getAccessor();
-
-                int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
-                if (currentTupleInOutFrame < 0
-                        || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
-
-                    tupleBuilder.reset();
-
-                    for (int k = 0; k < keyFields.length; k++) {
-                        tupleBuilder.addField(fta, tupleIndex, keyFields[k]);
-                    }
-
-                    merger.init(tupleBuilder, fta, tupleIndex, mergeState);
-
-                    if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
-                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-                        flushOutFrame(writer, finalPass);
-                        if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
-                                tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-                            throw new HyracksDataException(
-                                    "The partial result is too large to be initialized in a frame.");
-                        }
-                    }
-
-                } else {
-                    /**
-                     * if new tuple is in the same group of the
-                     * current aggregator do merge and output to the
-                     * outFrame
-                     */
-
-                    merger.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, mergeState);
-
-                }
-                tupleIndices[runIndex]++;
-                setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
-            }
-
-            if (outAppender.getTupleCount() > 0) {
-                flushOutFrame(writer, finalPass);
-                outAppender.reset(outFrame, true);
-            }
-
-            merger.close();
-
-            runs.subList(offset, runNumber).clear();
-            /**
-             * insert the new run file into the beginning of the run
-             * file list
-             */
-            if (!finalPass) {
-                runs.add(offset, ((RunFileWriter) writer).createReader());
-            }
-        } finally {
-            if (!finalPass) {
-                writer.close();
-            }
-            mergeState.reset();
-        }
-    }
-
-    private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
-
-        if (finalTupleBuilder == null) {
-            finalTupleBuilder = new ArrayTupleBuilder(inRecDesc.getFields().length);
-        }
-
-        if (writerFrame == null) {
-            writerFrame = ctx.allocateFrame();
-        }
-
-        if (writerAppender == null) {
-            writerAppender = new FrameTupleAppender(ctx.getFrameSize());
-        }
-        writerAppender.reset(writerFrame, true);
-
-        outFrameAccessor.reset(outFrame);
-
-        for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-
-            finalTupleBuilder.reset();
-
-            for (int k = 0; k < keyFields.length; k++) {
-                finalTupleBuilder.addField(outFrameAccessor, i, keyFields[k]);
-            }
-
-            if (isFinal) {
-
-                merger.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
-
-            } else {
-
-                merger.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
-            }
-
-            if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
-                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                FrameUtils.flushFrame(writerFrame, writer);
-                writerAppender.reset(writerFrame, true);
-                if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
-                        finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                    throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
-                }
-            }
-        }
-        if (writerAppender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(writerFrame, writer);
-            writerAppender.reset(writerFrame, true);
-        }
-
-        outAppender.reset(outFrame, true);
-    }
-
-    private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors, ReferencedBucketBasedPriorityQueue topTuples)
-            throws HyracksDataException {
-        int runStart = runIndex * runFrameLimit;
-        boolean existNext = false;
-        if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
-            /**
-             * run already closed
-             */
-            existNext = false;
-        } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
-            /**
-             * not the last frame for this run
-             */
-            existNext = true;
-            if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
-                tupleIndices[runIndex] = 0;
-                currentFrameIndexInRun[runIndex]++;
-            }
-        } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
-            /**
-             * the last frame has expired
-             */
-            existNext = true;
-        } else {
-            /**
-             * If all tuples in the targeting frame have been
-             * checked.
-             */
-            tupleIndices[runIndex] = 0;
-            currentFrameIndexInRun[runIndex] = runStart;
-            /**
-             * read in batch
-             */
-            currentRunFrames[runIndex] = 0;
-            for (int j = 0; j < runFrameLimit; j++) {
-                int frameIndex = currentFrameIndexInRun[runIndex] + j;
-                if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
-                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
-                    existNext = true;
-                    currentRunFrames[runIndex]++;
-                } else {
-                    break;
-                }
-            }
-        }
-
-        if (existNext) {
-            topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
-        } else {
-            topTuples.pop();
-            closeRun(runIndex, runCursors, tupleAccessors);
-        }
-    }
-
-    /**
-     * Close the run file, and also the corresponding readers and
-     * input frame.
-     * 
-     * @param index
-     * @param runCursors
-     * @param tupleAccessor
-     * @throws HyracksDataException
-     */
-    private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
-            throws HyracksDataException {
-        if (runCursors[index] != null) {
-            runCursors[index].close();
-            runCursors[index] = null;
-            int frameOffset = index * runFrameLimit;
-            for (int j = 0; j < runFrameLimit; j++) {
-                tupleAccessor[frameOffset + j] = null;
-            }
-        }
-    }
-
-    private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
-        byte[] b1 = fta1.getBuffer().array();
-        byte[] b2 = fta2.getBuffer().array();
-        for (int f = 0; f < keyFields.length; ++f) {
-            int fIdx = f;
-            int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
-            int l1 = fta1.getFieldLength(j1, fIdx);
-            int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
-            int l2_start = fta2.getFieldStartOffset(j2, fIdx);
-            int l2_end = fta2.getFieldEndOffset(j2, fIdx);
-            int l2 = l2_end - l2_start;
-            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    private Comparator<ReferenceHashEntry> createEntryComparator(final IBinaryComparator[] comparators) {
-        return new Comparator<ReferenceHashEntry>() {
-
-            @Override
-            public int compare(ReferenceHashEntry o1, ReferenceHashEntry o2) {
-                int cmp = o1.getHashValue() - o2.getHashValue();
-                if (cmp != 0) {
-                    return cmp;
-                } else {
-                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
-                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
-                    int j1 = o1.getTupleIndex();
-                    int j2 = o2.getTupleIndex();
-                    byte[] b1 = fta1.getBuffer().array();
-                    byte[] b2 = fta2.getBuffer().array();
-                    for (int f = 0; f < keyFields.length; ++f) {
-                        int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                                + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                                + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
-                }
-            }
-
-        };
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
deleted file mode 100644
index a846e4a..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class HybridHashSortRunMerger {
-
-    private final IHyracksTaskContext ctx;
-    private final List<RunFileReader> runs;
-    private final int[] keyFields;
-    private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDesc;
-    private final int framesLimit;
-    private final int tableSize;
-    private final IFrameWriter writer;
-    private final IAggregatorDescriptor grouper;
-    private final ITuplePartitionComputer tpc;
-    private ByteBuffer outFrame;
-    private FrameTupleAppender outFrameAppender;
-    private final boolean isLoadBuffered;
-
-    public HybridHashSortRunMerger(IHyracksTaskContext ctx, LinkedList<RunFileReader> runs, int[] keyFields,
-            IBinaryComparator[] comparators, RecordDescriptor recordDesc, ITuplePartitionComputer tpc,
-            IAggregatorDescriptor grouper, int framesLimit, int tableSize, IFrameWriter writer, boolean isLoadBuffered) {
-        this.ctx = ctx;
-        this.runs = runs;
-        this.keyFields = keyFields;
-        this.comparators = comparators;
-        this.recordDesc = recordDesc;
-        this.framesLimit = framesLimit;
-        this.writer = writer;
-        this.isLoadBuffered = isLoadBuffered;
-        this.tableSize = tableSize;
-        this.tpc = tpc;
-        this.grouper = grouper;
-    }
-
-    public void process() throws HyracksDataException {
-        
-        // FIXME
-        int mergeLevels = 0, mergeRunCount = 0;
-        try {
-
-            outFrame = ctx.allocateFrame();
-            outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-            outFrameAppender.reset(outFrame, true);
-
-            int maxMergeWidth = framesLimit - 1;
-            while (runs.size() > maxMergeWidth) {
-                int generationSeparator = 0;
-                // FIXME
-                int mergeRounds = 0;
-                while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
-                    int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth), runs.size()
-                            - maxMergeWidth + 1);
-                    FileReference newRun = null;
-                    IFrameWriter mergeResultWriter = this.writer;
-                    newRun = ctx.createManagedWorkspaceFile(HybridHashSortRunMerger.class.getSimpleName());
-                    mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
-                    mergeResultWriter.open();
-                    IFrameReader[] runCursors = new RunFileReader[mergeWidth];
-                    for (int i = 0; i < mergeWidth; i++) {
-                        runCursors[i] = runs.get(generationSeparator + i);
-                    }
-                    merge(mergeResultWriter, runCursors, false);
-                    runs.subList(generationSeparator, generationSeparator + mergeWidth).clear();
-                    runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
-                    mergeRounds++;
-                }
-                mergeLevels++;
-                mergeRunCount += mergeRounds;
-            }
-            if (!runs.isEmpty()) {
-                IFrameReader[] runCursors = new RunFileReader[runs.size()];
-                for (int i = 0; i < runCursors.length; i++) {
-                    runCursors[i] = runs.get(i);
-                }
-                merge(writer, runCursors, true);
-            }
-        } catch (Exception e) {
-            writer.fail();
-            throw new HyracksDataException(e);
-        } finally {
-
-            ctx.getCounterContext()
-                    .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.runs.count", true)
-                    .set(mergeRunCount);
-
-            ctx.getCounterContext()
-                    .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.levels", true)
-                    .set(mergeLevels);
-        }
-    }
-
-    private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors, boolean isFinal)
-            throws HyracksDataException {
-        // FIXME
-        long methodTimer = System.nanoTime();
-
-        IFrameReader merger = new GroupRunMergingFrameReader(ctx, runCursors, framesLimit, tableSize, keyFields, tpc,
-                comparators, grouper, recordDesc, isFinal, isLoadBuffered);
-        merger.open();
-        try {
-            while (merger.nextFrame(outFrame)) {
-                FrameUtils.flushFrame(outFrame, mergeResultWriter);
-            }
-        } finally {
-            merger.close();
-        }
-        ctx.getCounterContext()
-                .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.time", true)
-                .update(System.nanoTime() - methodTimer);
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
deleted file mode 100644
index 3c91fea..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class ReferenceEntryWithBucketID extends ReferenceEntry {
-
-    private int bucketID;
-
-    public ReferenceEntryWithBucketID(int runid, FrameTupleAccessor fta, int tupleIndex, int bucketID) {
-        super(runid, fta, tupleIndex);
-        this.bucketID = bucketID;
-    }
-
-    public int getBucketID() {
-        return bucketID;
-    }
-
-    public void setBucketID(int bucketID) {
-        this.bucketID = bucketID;
-    }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
deleted file mode 100644
index 394f0a8..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class ReferenceHashEntry extends ReferenceEntry {
-
-    private int hashValue;
-
-    public ReferenceHashEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int hashVal) {
-        super(runid, fta, tupleIndex);
-        this.hashValue = hashVal;
-    }
-
-    public int getHashValue() {
-        return hashValue;
-    }
-
-    public void setHashValue(int hashVal) {
-        this.hashValue = hashVal;
-    }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
deleted file mode 100644
index adfbe81..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Comparator;
-
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class ReferencedBucketBasedPriorityQueue {
-
-    private final int frameSize;
-    private final RecordDescriptor recordDescriptor;
-    private final ReferenceHashEntry entries[];
-    private final int size;
-    private final BitSet runAvail;
-    private int nItems;
-    private final int tableSize;
-
-    private final Comparator<ReferenceHashEntry> comparator;
-
-    private final ITuplePartitionComputer tpc;
-
-    public ReferencedBucketBasedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
-            Comparator<ReferenceHashEntry> comparator, ITuplePartitionComputer tpc, int tableSize) {
-        this.frameSize = frameSize;
-        this.recordDescriptor = recordDescriptor;
-        if (initSize < 1)
-            throw new IllegalArgumentException();
-        this.comparator = comparator;
-        nItems = initSize;
-        size = (initSize + 1) & 0xfffffffe;
-        entries = new ReferenceHashEntry[size];
-        runAvail = new BitSet(size);
-        runAvail.set(0, initSize, true);
-        for (int i = 0; i < size; i++) {
-            entries[i] = new ReferenceHashEntry(i, null, -1, -1);
-        }
-        this.tpc = tpc;
-        this.tableSize = tableSize;
-    }
-
-    /**
-     * Retrieve the top entry without removing it
-     * 
-     * @return the top entry
-     */
-    public ReferenceEntry peek() {
-        return entries[0];
-    }
-
-    /**
-     * compare the new entry with entries within the queue, to find a spot for
-     * this new entry
-     * 
-     * @param entry
-     * @return runid of this entry
-     * @throws HyracksDataException
-     * @throws IOException
-     */
-    public int popAndReplace(FrameTupleAccessor fta, int tIndex) throws HyracksDataException {
-        ReferenceHashEntry entry = entries[0];
-        if (entry.getAccessor() == null) {
-            entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
-        }
-        entry.getAccessor().reset(fta.getBuffer());
-        entry.setTupleIndex(tIndex);
-        entry.setHashValue(tpc.partition(fta, tIndex, tableSize));
-
-        add(entry);
-        return entry.getRunid();
-    }
-
-    /**
-     * Push entry into priority queue
-     * 
-     * @param e
-     *            the new Entry
-     * @throws HyracksDataException
-     */
-    private void add(ReferenceHashEntry e) throws HyracksDataException {
-        ReferenceHashEntry min = entries[0];
-        int slot = (size >> 1) + (min.getRunid() >> 1);
-
-        ReferenceHashEntry curr = e;
-        while (!runAvail.isEmpty() && slot > 0) {
-            int c = 0;
-            if (!runAvail.get(entries[slot].getRunid())) {
-                // run of entries[slot] is exhausted, i.e. not available, curr
-                // wins
-                c = 1;
-            } else if (entries[slot].getAccessor() != null /*
-                                                            * entries[slot] is
-                                                            * not MIN value
-                                                            */
-                    && runAvail.get(curr.getRunid() /* curr run is available */)) {
-
-                if (curr.getAccessor() != null) {
-                    c = comparator.compare(entries[slot], curr);
-                } else {
-                    // curr is MIN value, wins
-                    c = 1;
-                }
-            }
-
-            if (c <= 0) { // curr lost
-                // entries[slot] swaps up
-                ReferenceHashEntry tmp = entries[slot];
-                entries[slot] = curr;
-                curr = tmp;// winner to pass up
-            }// else curr wins
-            slot >>= 1;
-        }
-        // set new entries[0]
-        entries[0] = curr;
-    }
-
-    /**
-     * Pop is called only when a run is exhausted
-     * 
-     * @return
-     * @throws HyracksDataException
-     */
-    public ReferenceHashEntry pop() throws HyracksDataException {
-        ReferenceHashEntry min = entries[0];
-        runAvail.clear(min.getRunid());
-        add(min);
-        nItems--;
-        return min;
-    }
-
-    public boolean areRunsExhausted() {
-        return runAvail.isEmpty();
-    }
-
-    public int size() {
-        return nItems;
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
deleted file mode 100644
index d9d5118..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Comparator;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * TODO need to be merged with the ReferencedPriorityQueue in the util package
- */
-public class ReferencedPriorityQueue {
-    private final int frameSize;
-    private final RecordDescriptor recordDescriptor;
-    private final ReferenceEntryWithBucketID entries[];
-    private final int size;
-    private final BitSet runAvail;
-    private int nItems;
-
-    private final Comparator<ReferenceEntryWithBucketID> comparator;
-
-    public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
-            Comparator<ReferenceEntryWithBucketID> comparator) {
-        this.frameSize = frameSize;
-        this.recordDescriptor = recordDescriptor;
-        if (initSize < 1)
-            throw new IllegalArgumentException();
-        this.comparator = comparator;
-        nItems = initSize;
-        size = (initSize + 1) & 0xfffffffe;
-        entries = new ReferenceEntryWithBucketID[size];
-        runAvail = new BitSet(size);
-        runAvail.set(0, initSize, true);
-        for (int i = 0; i < size; i++) {
-            entries[i] = new ReferenceEntryWithBucketID(i, null, -1, -1);
-        }
-    }
-
-    /**
-     * Retrieve the top entry without removing it
-     * 
-     * @return the top entry
-     */
-    public ReferenceEntryWithBucketID peek() {
-        return entries[0];
-    }
-
-    /**
-     * compare the new entry with entries within the queue, to find a spot for
-     * this new entry
-     * 
-     * @param entry
-     * @return runid of this entry
-     * @throws IOException
-     */
-    public int popAndReplace(FrameTupleAccessor fta, int tIndex, int bucketID) {
-        ReferenceEntryWithBucketID entry = entries[0];
-        if (entry.getAccessor() == null) {
-            entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
-        }
-        entry.getAccessor().reset(fta.getBuffer());
-        entry.setTupleIndex(tIndex);
-        entry.setBucketID(bucketID);
-
-        add(entry);
-        return entry.getRunid();
-    }
-
-    /**
-     * Push entry into priority queue
-     * 
-     * @param e
-     *            the new Entry
-     */
-    private void add(ReferenceEntryWithBucketID e) {
-        ReferenceEntryWithBucketID min = entries[0];
-        int slot = (size >> 1) + (min.getRunid() >> 1);
-
-        ReferenceEntryWithBucketID curr = e;
-        while (!runAvail.isEmpty() && slot > 0) {
-            int c = 0;
-            if (!runAvail.get(entries[slot].getRunid())) {
-                // run of entries[slot] is exhausted, i.e. not available, curr
-                // wins
-                c = 1;
-            } else if (entries[slot].getAccessor() != null /*
-                                                            * entries[slot] is
-                                                            * not MIN value
-                                                            */
-                    && runAvail.get(curr.getRunid() /* curr run is available */)) {
-
-                if (curr.getAccessor() != null) {
-                    c = comparator.compare(entries[slot], curr);
-                } else {
-                    // curr is MIN value, wins
-                    c = 1;
-                }
-            }
-
-            if (c <= 0) { // curr lost
-                // entries[slot] swaps up
-                ReferenceEntryWithBucketID tmp = entries[slot];
-                entries[slot] = curr;
-                curr = tmp;// winner to pass up
-            }// else curr wins
-            slot >>= 1;
-        }
-        // set new entries[0]
-        entries[0] = curr;
-    }
-
-    /**
-     * Pop is called only when a run is exhausted
-     * 
-     * @return
-     */
-    public ReferenceEntryWithBucketID pop() {
-        ReferenceEntryWithBucketID min = entries[0];
-        runAvail.clear(min.getRunid());
-        add(min);
-        nItems--;
-        return min;
-    }
-
-    public boolean areRunsExhausted() {
-        return runAvail.isEmpty();
-    }
-
-    public int size() {
-        return nItems;
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
deleted file mode 100644
index 72bae76..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class FrameTupleAccessorForGroupHashtable implements IFrameTupleAccessor {
-    private final int frameSize;
-    private final RecordDescriptor recordDescriptor;
-
-    private final static int INT_SIZE = 4;
-
-    private ByteBuffer buffer;
-
-    public FrameTupleAccessorForGroupHashtable(int frameSize, RecordDescriptor recordDescriptor) {
-        this.frameSize = frameSize;
-        this.recordDescriptor = recordDescriptor;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldCount()
-     */
-    @Override
-    public int getFieldCount() {
-        return recordDescriptor.getFieldCount();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldSlotsLength()
-     */
-    @Override
-    public int getFieldSlotsLength() {
-        return getFieldCount() * 4;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldEndOffset(int, int)
-     */
-    @Override
-    public int getFieldEndOffset(int tupleIndex, int fIdx) {
-        return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldStartOffset(int, int)
-     */
-    @Override
-    public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldLength(int, int)
-     */
-    @Override
-    public int getFieldLength(int tupleIndex, int fIdx) {
-        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleEndOffset(int)
-     */
-    @Override
-    public int getTupleEndOffset(int tupleIndex) {
-        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1)) - 2 * INT_SIZE;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleStartOffset(int)
-     */
-    @Override
-    public int getTupleStartOffset(int tupleIndex) {
-        return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleCount()
-     */
-    @Override
-    public int getTupleCount() {
-        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getBuffer()
-     */
-    @Override
-    public ByteBuffer getBuffer() {
-        return buffer;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#reset(java.nio.ByteBuffer)
-     */
-    @Override
-    public void reset(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    public int getTupleHashReferenceOffset(int tupleIndex) {
-        return getTupleEndOffset(tupleIndex);
-    }
-
-    public int getTupleEndOffsetWithHashReference(int tupleIndex) {
-        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
-    }
-
-    public int getHashReferenceNextFrameIndex(int tupleIndex) {
-        return buffer.getInt(getTupleHashReferenceOffset(tupleIndex));
-    }
-
-    public int getHashReferenceNextTupleIndex(int tupleIndex) {
-        return buffer.getInt(getTupleHashReferenceOffset(tupleIndex) + INT_SIZE);
-    }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
deleted file mode 100644
index c5668f5..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-
-public class FrameTupleAppenderForGroupHashtable {
-    private final int frameSize;
-
-    private ByteBuffer buffer;
-
-    private int tupleCount;
-
-    private int tupleDataEndOffset;
-
-    public FrameTupleAppenderForGroupHashtable(int frameSize) {
-        this.frameSize = frameSize;
-    }
-
-    public void reset(ByteBuffer buffer, boolean clear) {
-        this.buffer = buffer;
-        if (clear) {
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
-            tupleCount = 0;
-            tupleDataEndOffset = 0;
-        } else {
-            tupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
-            tupleDataEndOffset = tupleCount == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)
-                    - tupleCount * 4);
-        }
-    }
-
-    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) {
-        if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
-            }
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + fieldSlots.length * 4, length);
-            buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length, -1);
-            buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length + 4, -1);
-            tupleDataEndOffset += fieldSlots.length * 4 + length + 2 * 4;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
-            ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
-            return true;
-        }
-        return false;
-    }
-
-    public boolean append(byte[] bytes, int offset, int length) {
-        if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
-            buffer.putInt(tupleDataEndOffset + length, -1);
-            buffer.putInt(tupleDataEndOffset + length + 4, -1);
-            tupleDataEndOffset += length + 2 * 4;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
-            ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
-            return true;
-        }
-        return false;
-    }
-
-    public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) {
-        if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
-            int effectiveSlots = 0;
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                if (fieldSlots[i] > 0) {
-                    buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
-                    effectiveSlots++;
-                }
-            }
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + effectiveSlots * 4, length);
-            buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length, -1);
-            buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length + 4, -1);
-            tupleDataEndOffset += effectiveSlots * 4 + length + 2 * 4;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
-            ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
-            return true;
-        }
-        return false;
-    }
-
-    public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
-        int length = tEndOffset - tStartOffset;
-        if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
-            ByteBuffer src = tupleAccessor.getBuffer();
-            System.arraycopy(src.array(), tStartOffset, buffer.array(), tupleDataEndOffset, length);
-            buffer.putInt(tupleDataEndOffset + length, -1);
-            buffer.putInt(tupleDataEndOffset + length + 4, -1);
-            tupleDataEndOffset += length + 2 * 4;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
-            ++tupleCount;
-            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
-            return true;
-        }
-        return false;
-    }
-
-    public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) {
-        int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
-        int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
-        return append(tupleAccessor, tStartOffset, tEndOffset);
-    }
-
-    public int getTupleCount() {
-        return tupleCount;
-    }
-
-    public ByteBuffer getBuffer() {
-        return buffer;
-    }
-}
-
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
deleted file mode 100644
index b325b83..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-
-public class HybridHashGroupHashTable implements IFrameWriter {
-
-    private final static int HEADER_REF_EMPTY = -1;
-
-    private static final int INT_SIZE = 4;
-
-    private IHyracksTaskContext ctx;
-
-    private final int frameSize;
-
-    private final int framesLimit;
-
-    private final int tableSize;
-
-    private final int numOfPartitions;
-
-    private final IFrameWriter outputWriter;
-
-    private final IBinaryComparator[] comparators;
-
-    /**
-     * index for keys
-     */
-    private final int[] inputKeys, internalKeys;
-
-    private final RecordDescriptor inputRecordDescriptor, outputRecordDescriptor;
-
-    /**
-     * hash partitioner for hashing
-     */
-    private final ITuplePartitionComputer hashComputer;
-
-    /**
-     * hash partitioner for partitioning
-     */
-    private final ITuplePartitionComputer partitionComputer;
-
-    /**
-     * Hashtable haders
-     */
-    private ByteBuffer[] headers;
-
-    /**
-     * buffers for hash table
-     */
-    private ByteBuffer[] contents;
-
-    /**
-     * output buffers for spilled partitions
-     */
-    private ByteBuffer[] spilledPartOutputBuffers;
-
-    /**
-     * run writers for spilled partitions
-     */
-    private RunFileWriter[] spilledPartRunWriters;
-
-    private int[] spilledPartRunSizeArrayInFrames;
-    private int[] spilledPartRunSizeArrayInTuples;
-
-    private List<IFrameReader> spilledPartRunReaders = null;
-    private List<Integer> spilledRunAggregatedPages = null;
-    private List<Integer> spilledPartRunSizesInFrames = null;
-    private List<Integer> spilledPartRunSizesInTuples = null;
-
-    /**
-     * index of the current working buffer in hash table
-     */
-    private int currentHashTableFrame;
-
-    /**
-     * Aggregation state
-     */
-    private AggregateState htAggregateState;
-
-    /**
-     * the aggregator
-     */
-    private final IAggregatorDescriptor aggregator;
-
-    /**
-     * records inserted into the in-memory hash table (for hashing and aggregation)
-     */
-    private int hashedRawRecords = 0;
-
-    /**
-     * in-memory part size in tuples
-     */
-    private int hashedKeys = 0;
-
-    /**
-     * Hash table tuple pointer
-     */
-    private TuplePointer matchPointer;
-
-    /**
-     * Frame tuple accessor for input data frames
-     */
-    private FrameTupleAccessor inputFrameTupleAccessor;
-
-    /**
-     * flag for whether the hash table if full
-     */
-    private boolean isHashtableFull;
-
-    /**
-     * flag for only partition (no aggregation and hashing)
-     */
-    private boolean isPartitionOnly;
-
-    /**
-     * Tuple accessor for hash table contents
-     */
-    private FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
-
-    private ArrayTupleBuilder internalTupleBuilder;
-
-    private FrameTupleAppender spilledPartInsertAppender;
-
-    private FrameTupleAppenderForGroupHashtable htInsertAppender;
-
-    public HybridHashGroupHashTable(IHyracksTaskContext ctx, int framesLimit, int tableSize, int numOfPartitions,
-            int[] keys, int hashSeedOffset, IBinaryComparator[] comparators, ITuplePartitionComputerFamily tpcFamily,
-            IAggregatorDescriptor aggregator, RecordDescriptor inputRecordDescriptor,
-            RecordDescriptor outputRecordDescriptor, IFrameWriter outputWriter) throws HyracksDataException {
-        this.ctx = ctx;
-        this.frameSize = ctx.getFrameSize();
-        this.tableSize = tableSize;
-        this.framesLimit = framesLimit;
-        this.numOfPartitions = numOfPartitions;
-        this.inputKeys = keys;
-        this.internalKeys = new int[keys.length];
-        for (int i = 0; i < internalKeys.length; i++) {
-            internalKeys[i] = i;
-        }
-
-        this.comparators = comparators;
-
-        this.inputRecordDescriptor = inputRecordDescriptor;
-        this.outputRecordDescriptor = outputRecordDescriptor;
-
-        this.outputWriter = outputWriter;
-
-        this.hashComputer = tpcFamily.createPartitioner(hashSeedOffset * 2);
-        this.partitionComputer = tpcFamily.createPartitioner(hashSeedOffset * 2 + 1);
-
-        this.aggregator = aggregator;
-
-    }
-
-    public static double getHashtableOverheadRatio(int tableSize, int frameSize, int framesLimit, int recordSizeInByte) {
-        int pagesForRecord = framesLimit - getHeaderPages(tableSize, frameSize);
-        int recordsInHashtable = (pagesForRecord - 1) * ((int) (frameSize / (recordSizeInByte + 2 * INT_SIZE)));
-
-        return (double) framesLimit * frameSize / recordsInHashtable / recordSizeInByte;
-    }
-
-    public static int getHeaderPages(int tableSize, int frameSize) {
-        return (int) Math.ceil((double)tableSize * INT_SIZE * 2 / frameSize);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        // initialize hash headers
-        int htHeaderCount = getHeaderPages(tableSize, frameSize);
-
-        isPartitionOnly = false;
-        if (numOfPartitions >= framesLimit - htHeaderCount) {
-            isPartitionOnly = true;
-        }
-
-        if (isPartitionOnly) {
-            htHeaderCount = 0;
-        }
-
-        headers = new ByteBuffer[htHeaderCount];
-
-        // initialize hash table contents
-        contents = new ByteBuffer[framesLimit - htHeaderCount - numOfPartitions];
-        currentHashTableFrame = 0;
-        isHashtableFull = false;
-
-        // initialize hash table aggregate state
-        htAggregateState = aggregator.createAggregateStates();
-
-        // initialize partition information
-        spilledPartOutputBuffers = new ByteBuffer[numOfPartitions];
-        spilledPartRunWriters = new RunFileWriter[numOfPartitions];
-        spilledPartRunSizeArrayInFrames = new int[numOfPartitions];
-        spilledPartRunSizeArrayInTuples = new int[numOfPartitions];
-
-        // initialize other helper classes
-        inputFrameTupleAccessor = new FrameTupleAccessor(frameSize, inputRecordDescriptor);
-        internalTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
-        spilledPartInsertAppender = new FrameTupleAppender(frameSize);
-
-        htInsertAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
-        matchPointer = new TuplePointer();
-        hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outputRecordDescriptor);
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        inputFrameTupleAccessor.reset(buffer);
-        int tupleCount = inputFrameTupleAccessor.getTupleCount();
-        for (int i = 0; i < tupleCount; i++) {
-            insert(inputFrameTupleAccessor, i);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        for (int i = 0; i < numOfPartitions; i++) {
-            if (spilledPartRunWriters[i] != null) {
-                spilledPartRunWriters[i].close();
-            }
-        }
-        htAggregateState.close();
-    }
-
-    private void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
-
-        if (isPartitionOnly) {
-            // for partition only
-            int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
-            insertSpilledPartition(accessor, tupleIndex, pid);
-            spilledPartRunSizeArrayInTuples[pid]++;
-            return;
-        }
-
-        int hid = hashComputer.partition(accessor, tupleIndex, tableSize);
-
-        if (findMatch(hid, accessor, tupleIndex)) {
-            // found a matching: do aggregation
-            hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
-            aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex,
-                    htAggregateState);
-            hashedRawRecords++;
-        } else {
-            if (isHashtableFull) {
-                // when hash table is full: spill the record
-                int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
-                insertSpilledPartition(accessor, tupleIndex, pid);
-                spilledPartRunSizeArrayInTuples[pid]++;
-            } else {
-                // insert a new entry into the hash table
-                internalTupleBuilder.reset();
-                for (int k = 0; k < inputKeys.length; k++) {
-                    internalTupleBuilder.addField(accessor, tupleIndex, inputKeys[k]);
-                }
-
-                aggregator.init(internalTupleBuilder, accessor, tupleIndex, htAggregateState);
-
-                if (contents[currentHashTableFrame] == null) {
-                    contents[currentHashTableFrame] = ctx.allocateFrame();
-                }
-
-                htInsertAppender.reset(contents[currentHashTableFrame], false);
-                if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
-                        internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
-                    // hash table is full: try to allocate more frame
-                    currentHashTableFrame++;
-                    if (currentHashTableFrame >= contents.length) {
-                        // no more frame to allocate: stop expending the hash table
-                        isHashtableFull = true;
-
-                        // reinsert the record
-                        insert(accessor, tupleIndex);
-
-                        return;
-                    } else {
-                        if (contents[currentHashTableFrame] == null) {
-                            contents[currentHashTableFrame] = ctx.allocateFrame();
-                        }
-
-                        htInsertAppender.reset(contents[currentHashTableFrame], true);
-
-                        if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
-                                internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
-                            throw new HyracksDataException(
-                                    "Failed to insert an aggregation partial result into the in-memory hash table: it has the length of "
-                                            + internalTupleBuilder.getSize() + " and fields "
-                                            + internalTupleBuilder.getFieldEndOffsets().length);
-                        }
-
-                    }
-                }
-
-                // update hash table reference
-                if (matchPointer.frameIndex < 0) {
-                    // need to initialize the hash table header
-                    int headerFrameIndex = getHeaderFrameIndex(hid);
-                    int headerFrameOffset = getHeaderTupleIndex(hid);
-
-                    if (headers[headerFrameIndex] == null) {
-                        headers[headerFrameIndex] = ctx.allocateFrame();
-                        resetHeader(headerFrameIndex);
-                    }
-
-                    headers[headerFrameIndex].putInt(headerFrameOffset, currentHashTableFrame);
-                    headers[headerFrameIndex]
-                            .putInt(headerFrameOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
-                } else {
-                    // update the previous reference
-                    hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
-                    int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
-                    contents[matchPointer.frameIndex].putInt(refOffset, currentHashTableFrame);
-                    contents[matchPointer.frameIndex]
-                            .putInt(refOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
-                }
-
-                hashedKeys++;
-                hashedRawRecords++;
-            }
-        }
-    }
-
-    /**
-     * Insert record into a spilled partition, by directly copying the tuple into the output buffer.
-     * 
-     * @param accessor
-     * @param tupleIndex
-     * @param pid
-     */
-    private void insertSpilledPartition(FrameTupleAccessor accessor, int tupleIndex, int pid)
-            throws HyracksDataException {
-
-        if (spilledPartOutputBuffers[pid] == null) {
-            spilledPartOutputBuffers[pid] = ctx.allocateFrame();
-        }
-
-        spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], false);
-
-        if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
-            // the output buffer is full: flush
-            flushSpilledPartitionOutputBuffer(pid);
-            // reset the output buffer
-            spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], true);
-
-            if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
-                throw new HyracksDataException("Failed to insert a record into a spilled partition!");
-            }
-        }
-
-    }
-
-    /**
-     * Flush a spilled partition's output buffer.
-     * 
-     * @param pid
-     * @throws HyracksDataException
-     */
-    private void flushSpilledPartitionOutputBuffer(int pid) throws HyracksDataException {
-        if (spilledPartRunWriters[pid] == null) {
-            spilledPartRunWriters[pid] = new RunFileWriter(
-                    ctx.createManagedWorkspaceFile("HashHashPrePartitionHashTable"), ctx.getIOManager());
-            spilledPartRunWriters[pid].open();
-        }
-
-        FrameUtils.flushFrame(spilledPartOutputBuffers[pid], spilledPartRunWriters[pid]);
-
-        spilledPartRunSizeArrayInFrames[pid]++;
-    }
-
-    /**
-     * Hash table lookup
-     * 
-     * @param hid
-     * @param accessor
-     * @param tupleIndex
-     * @return
-     */
-    private boolean findMatch(int hid, FrameTupleAccessor accessor, int tupleIndex) {
-
-        matchPointer.frameIndex = -1;
-        matchPointer.tupleIndex = -1;
-
-        // get reference in the header
-        int headerFrameIndex = getHeaderFrameIndex(hid);
-        int headerFrameOffset = getHeaderTupleIndex(hid);
-
-        if (headers[headerFrameIndex] == null) {
-            return false;
-        }
-
-        // initialize the pointer to the first record 
-        int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
-        int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
-
-        while (entryFrameIndex >= 0) {
-            matchPointer.frameIndex = entryFrameIndex;
-            matchPointer.tupleIndex = entryTupleIndex;
-            hashtableRecordAccessor.reset(contents[entryFrameIndex]);
-            if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
-                return true;
-            }
-            // Move to the next record in this entry following the linked list
-            int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
-            int prevFrameIndex = entryFrameIndex;
-            entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
-            entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
-        }
-
-        return false;
-    }
-
-    public void finishup() throws HyracksDataException {
-        // spill all output buffers
-        for (int i = 0; i < numOfPartitions; i++) {
-            if (spilledPartOutputBuffers[i] != null) {
-                flushSpilledPartitionOutputBuffer(i);
-            }
-        }
-        spilledPartOutputBuffers = null;
-
-        // flush in-memory aggregation results: no more frame cost here as all output buffers are recycled
-        ByteBuffer outputBuffer = ctx.allocateFrame();
-        FrameTupleAppender outputBufferAppender = new FrameTupleAppender(frameSize);
-        outputBufferAppender.reset(outputBuffer, true);
-
-        ArrayTupleBuilder outFlushTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
-
-        for (ByteBuffer htFrame : contents) {
-            if (htFrame == null) {
-                continue;
-            }
-            hashtableRecordAccessor.reset(htFrame);
-            int tupleCount = hashtableRecordAccessor.getTupleCount();
-            for (int i = 0; i < tupleCount; i++) {
-                outFlushTupleBuilder.reset();
-
-                for (int k = 0; k < internalKeys.length; k++) {
-                    outFlushTupleBuilder.addField(hashtableRecordAccessor, i, internalKeys[k]);
-                }
-
-                aggregator.outputFinalResult(outFlushTupleBuilder, hashtableRecordAccessor, i, htAggregateState);
-
-                if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
-                        outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
-                    FrameUtils.flushFrame(outputBuffer, outputWriter);
-                    outputBufferAppender.reset(outputBuffer, true);
-
-                    if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
-                            outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
-                        throw new HyracksDataException(
-                                "Failed to flush a record from in-memory hash table: record has length of "
-                                        + outFlushTupleBuilder.getSize() + " and fields "
-                                        + outFlushTupleBuilder.getFieldEndOffsets().length);
-                    }
-                }
-            }
-        }
-
-        if (outputBufferAppender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(outputBuffer, outputWriter);
-        }
-
-        // create run readers and statistic information for spilled runs
-        spilledPartRunReaders = new LinkedList<IFrameReader>();
-        spilledRunAggregatedPages = new LinkedList<Integer>();
-        spilledPartRunSizesInFrames = new LinkedList<Integer>();
-        spilledPartRunSizesInTuples = new LinkedList<Integer>();
-        for (int i = 0; i < numOfPartitions; i++) {
-            if (spilledPartRunWriters[i] != null) {
-                spilledPartRunReaders.add(spilledPartRunWriters[i].createReader());
-                spilledRunAggregatedPages.add(0);
-                spilledPartRunWriters[i].close();
-                spilledPartRunSizesInFrames.add(spilledPartRunSizeArrayInFrames[i]);
-                spilledPartRunSizesInTuples.add(spilledPartRunSizeArrayInTuples[i]);
-            }
-        }
-    }
-
-    /**
-     * Compare an input record with a hash table entry.
-     * 
-     * @param accessor
-     * @param tupleIndex
-     * @param hashAccessor
-     * @param hashTupleIndex
-     * @return
-     */
-    private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
-            int hashTupleIndex) {
-        int tStart0 = accessor.getTupleStartOffset(tupleIndex);
-        int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
-
-        int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
-        int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
-
-        for (int i = 0; i < internalKeys.length; ++i) {
-            int fStart0 = accessor.getFieldStartOffset(tupleIndex, inputKeys[i]);
-            int fEnd0 = accessor.getFieldEndOffset(tupleIndex, inputKeys[i]);
-            int fLen0 = fEnd0 - fStart0;
-
-            int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
-            int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
-            int fLen1 = fEnd1 - fStart1;
-
-            int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    /**
-     * Get the header frame index of the given hash table entry
-     * 
-     * @param entry
-     * @return
-     */
-    private int getHeaderFrameIndex(int entry) {
-        int frameIndex = (entry / frameSize * 2 * INT_SIZE) + (entry % frameSize * 2 * INT_SIZE / frameSize);
-        return frameIndex;
-    }
-
-    /**
-     * Get the tuple index of the given hash table entry
-     * 
-     * @param entry
-     * @return
-     */
-    private int getHeaderTupleIndex(int entry) {
-        int offset = (entry % frameSize) * 2 * INT_SIZE % frameSize;
-        return offset;
-    }
-
-    /**
-     * reset the header page.
-     * 
-     * @param headerFrameIndex
-     */
-    private void resetHeader(int headerFrameIndex) {
-        for (int i = 0; i < frameSize; i += INT_SIZE) {
-            headers[headerFrameIndex].putInt(i, HEADER_REF_EMPTY);
-        }
-    }
-
-    public List<Integer> getSpilledRunsSizeInRawTuples() throws HyracksDataException {
-        return spilledPartRunSizesInTuples;
-    }
-
-    public int getHashedUniqueKeys() throws HyracksDataException {
-        return hashedKeys;
-    }
-
-    public int getHashedRawRecords() throws HyracksDataException {
-        return hashedRawRecords;
-    }
-
-    public List<Integer> getSpilledRunsAggregatedPages() throws HyracksDataException {
-        return spilledRunAggregatedPages;
-    }
-
-    public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
-        return spilledPartRunReaders;
-    }
-
-    public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
-        return spilledPartRunSizesInFrames;
-    }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
deleted file mode 100644
index 118ca75..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortGroupHashTable;
-import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortRunMerger;
-
-public class HybridHashGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final double HYBRID_FALLBACK_THRESHOLD = 0.8;
-
-    // merge with fudge factor
-    private static final double ESTIMATOR_MAGNIFIER = 1.2;
-
-    // input key fields
-    private final int[] keyFields;
-
-    // intermediate and final key fields
-    private final int[] storedKeyFields;
-
-    /**
-     * Input sizes as the count of the raw records.
-     */
-    private final long inputSizeInRawRecords;
-
-    /**
-     * Input size as the count of the unique keys.
-     */
-    private final long inputSizeInUniqueKeys;
-
-    // hash table size
-    private final int tableSize;
-
-    // estimated record size: used for compute the fudge factor
-    private final int userProvidedRecordSizeInBytes;
-
-    // aggregator
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-
-    // merger, in case of falling back to the hash-sort algorithm for hash skewness
-    private final IAggregatorDescriptorFactory mergerFactory;
-
-    // for the sort fall-back algorithm
-    private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
-    // total memory in pages
-    private final int framesLimit;
-
-    // comparator factories for key fields.
-    private final IBinaryComparatorFactory[] comparatorFactories;
-
-    /**
-     * hash families for each field: a hash function family is need as we may have
-     * more than one levels of hashing
-     */
-    private final IBinaryHashFunctionFamily[] hashFamilies;
-
-    /**
-     * Flag for input adjustment
-     */
-    private final boolean doInputAdjustment;
-
-    private final static double FUDGE_FACTOR_ESTIMATION = 1.2;
-
-    public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
-            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
-            IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
-            int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
-            RecordDescriptor outRecDesc) throws HyracksDataException {
-        this(spec, keyFields, framesLimit, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
-                comparatorFactories, hashFamilies, hashFuncStartLevel, firstNormalizerFactory, aggregatorFactory,
-                mergerFactory, outRecDesc, true);
-    }
-
-    public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
-            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
-            IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
-            int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
-            RecordDescriptor outRecDesc, boolean doInputAdjustment) throws HyracksDataException {
-        super(spec, 1, 1);
-        this.framesLimit = framesLimit;
-        this.tableSize = tableSize;
-        this.userProvidedRecordSizeInBytes = recordSizeInBytes;
-
-        this.inputSizeInRawRecords = inputSizeInRawRecords;
-        this.inputSizeInUniqueKeys = inputSizeInUniqueKeys;
-
-        if (framesLimit <= 3) {
-            // at least 3 frames: 2 for in-memory hash table, and 1 for output buffer
-            throw new HyracksDataException(
-                    "Not enough memory for Hash-Hash Aggregation algorithm: at least 3 frames are necessary, but only "
-                            + framesLimit + " available.");
-        }
-
-        this.keyFields = keyFields;
-        storedKeyFields = new int[keyFields.length];
-        for (int i = 0; i < storedKeyFields.length; i++) {
-            storedKeyFields[i] = i;
-        }
-
-        this.aggregatorFactory = aggregatorFactory;
-
-        this.mergerFactory = mergerFactory;
-        this.firstNormalizerFactory = firstNormalizerFactory;
-
-        this.comparatorFactories = comparatorFactories;
-
-        this.hashFamilies = hashFamilies;
-
-        recordDescriptors[0] = outRecDesc;
-
-        this.doInputAdjustment = doInputAdjustment;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-
-        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparators.length; i++) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-
-        final RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-
-        final int frameSize = ctx.getFrameSize();
-
-        final double fudgeFactor = HybridHashGroupHashTable.getHashtableOverheadRatio(tableSize, frameSize,
-                framesLimit, userProvidedRecordSizeInBytes) * FUDGE_FACTOR_ESTIMATION;
-
-        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-
-            HybridHashGroupHashTable topProcessor;
-
-            int observedInputSizeInRawTuples;
-            int observedInputSizeInFrames, maxRecursiveLevels;
-
-            int userProvidedInputSizeInFrames;
-
-            boolean topLevelFallbackCheck = true;
-
-            ITuplePartitionComputerFamily tpcf = new FieldHashPartitionComputerFamily(keyFields, hashFamilies);
-
-            ITuplePartitionComputerFamily tpcfMerge = new FieldHashPartitionComputerFamily(storedKeyFields,
-                    hashFamilies);
-
-            ByteBuffer readAheadBuf;
-
-            /**
-             * Compute the partition numbers using hybrid-hash formula.
-             * 
-             * @param tableSize
-             * @param framesLimit
-             * @param inputKeySize
-             * @param partitionInOperator
-             * @param factor
-             * @return
-             */
-            private int getNumberOfPartitions(int tableSize, int framesLimit, long inputKeySize, double factor) {
-
-                int hashtableHeaderPages = HybridHashGroupHashTable.getHeaderPages(tableSize, frameSize);
-
-                int numberOfPartitions = HybridHashUtil.hybridHashPartitionComputer((int) Math.ceil(inputKeySize),
-                        framesLimit, factor);
-
-                // if the partition number is more than the available hash table contents, do pure partition.
-                if (numberOfPartitions >= framesLimit - hashtableHeaderPages) {
-                    numberOfPartitions = framesLimit;
-                }
-
-                if (numberOfPartitions <= 0) {
-                    numberOfPartitions = 1;
-                }
-
-                return numberOfPartitions;
-            }
-
-            @Override
-            public void open() throws HyracksDataException {
-
-                observedInputSizeInFrames = 0;
-
-                // estimate the number of unique keys for this partition, given the total raw record count and unique record count
-                long estimatedNumberOfUniqueKeys = HybridHashUtil.getEstimatedPartitionSizeOfUniqueKeys(
-                        inputSizeInRawRecords, inputSizeInUniqueKeys, 1);
-
-                userProvidedInputSizeInFrames = (int) Math.ceil(estimatedNumberOfUniqueKeys
-                        * userProvidedRecordSizeInBytes / frameSize);
-
-                int topPartitions = getNumberOfPartitions(tableSize, framesLimit,
-                        (int) Math.ceil(userProvidedInputSizeInFrames * ESTIMATOR_MAGNIFIER), fudgeFactor);
-
-                topProcessor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize, topPartitions, keyFields, 0,
-                        comparators, tpcf, aggregatorFactory.createAggregator(ctx, inRecDesc, recordDescriptors[0],
-                                keyFields, storedKeyFields), inRecDesc, recordDescriptors[0], writer);
-
-                writer.open();
-                topProcessor.open();
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                observedInputSizeInRawTuples += buffer.getInt(buffer.capacity() - 4);
-                observedInputSizeInFrames++;
-                topProcessor.nextFrame(buffer);
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                // TODO Auto-generated method stub
-
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                // estimate the maximum recursive levels
-                maxRecursiveLevels = (int) Math.max(
-                        Math.ceil(Math.log(observedInputSizeInFrames * fudgeFactor) / Math.log(framesLimit)) + 1, 1);
-
-                finishAndRecursion(topProcessor, observedInputSizeInRawTuples, inputSizeInUniqueKeys, 0,
-                        topLevelFallbackCheck);
-
-                writer.close();
-
-            }
-
-            private void processRunFiles(IFrameReader runReader, int inputCardinality, int runLevel)
-                    throws HyracksDataException {
-
-                boolean checkFallback = true;
-
-                int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, (long)inputCardinality
-                        * userProvidedRecordSizeInBytes / frameSize, fudgeFactor);
-
-                HybridHashGroupHashTable processor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize,
-                        numOfPartitions, keyFields, runLevel, comparators, tpcf, aggregatorFactory.createAggregator(
-                                ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
-                        recordDescriptors[0], writer);
-
-                processor.open();
-
-                runReader.open();
-
-                int inputRunRawSizeInTuples = 0;
-
-                if (readAheadBuf == null) {
-                    readAheadBuf = ctx.allocateFrame();
-                }
-                while (runReader.nextFrame(readAheadBuf)) {
-                    inputRunRawSizeInTuples += readAheadBuf.getInt(readAheadBuf.capacity() - 4);
-                    processor.nextFrame(readAheadBuf);
-                }
-
-                runReader.close();
-
-                finishAndRecursion(processor, inputRunRawSizeInTuples, inputCardinality, runLevel, checkFallback);
-            }
-
-            /**
-             * Finish the hash table processing and start recursive processing on run files.
-             * 
-             * @param ht
-             * @param inputRawRecordCount
-             * @param inputCardinality
-             * @param level
-             * @param checkFallback
-             * @throws HyracksDataException
-             */
-            private void finishAndRecursion(HybridHashGroupHashTable ht, long inputRawRecordCount,
-                    long inputCardinality, int level, boolean checkFallback) throws HyracksDataException {
-
-                ht.finishup();
-
-                List<IFrameReader> generatedRunReaders = ht.getSpilledRuns();
-                List<Integer> partitionRawRecords = ht.getSpilledRunsSizeInRawTuples();
-
-                int directFlushKeysInTuples = ht.getHashedUniqueKeys();
-                int directFlushRawRecordsInTuples = ht.getHashedRawRecords();
-
-                ht.close();
-                ht = null;
-
-                ctx.getCounterContext().getCounter("optional.levels." + level + ".estiInputKeyCardinality", true)
-                        .update(inputCardinality);
-
-                // do adjustment
-                if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
-                    inputCardinality = (int) Math.ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
-                            * inputRawRecordCount);
-                }
-
-                ctx.getCounterContext()
-                        .getCounter("optional.levels." + level + ".estiInputKeyCardinalityAdjusted", true)
-                        .update(inputCardinality);
-
-                IFrameReader recurRunReader;
-                int subPartitionRawRecords;
-
-                while (!generatedRunReaders.isEmpty()) {
-                    recurRunReader = generatedRunReaders.remove(0);
-                    subPartitionRawRecords = partitionRawRecords.remove(0);
-
-                    int runKeyCardinality = (int) Math.ceil((double) inputCardinality * subPartitionRawRecords
-                            / inputRawRecordCount);
-
-                    if ((checkFallback && runKeyCardinality > HYBRID_FALLBACK_THRESHOLD * inputCardinality)
-                            || level > maxRecursiveLevels) {
-                        Logger.getLogger(HybridHashGroupOperatorDescriptor.class.getSimpleName()).warning(
-                                "Hybrid-hash falls back to hash-sort algorithm! (" + level + ":" + maxRecursiveLevels
-                                        + ")");
-                        fallback(recurRunReader, level);
-                    } else {
-                        processRunFiles(recurRunReader, runKeyCardinality, level + 1);
-                    }
-
-                }
-            }
-
-            private void fallback(IFrameReader recurRunReader, int runLevel) throws HyracksDataException {
-                // fall back
-                FrameTupleAccessor runFrameTupleAccessor = new FrameTupleAccessor(frameSize, inRecDesc);
-                HybridHashSortGroupHashTable hhsTable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
-                        keyFields, comparators, tpcf.createPartitioner(runLevel + 1),
-                        firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(ctx,
-                                inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
-                        recordDescriptors[0]);
-
-                recurRunReader.open();
-                if (readAheadBuf == null) {
-                    readAheadBuf = ctx.allocateFrame();
-                }
-                while (recurRunReader.nextFrame(readAheadBuf)) {
-                    runFrameTupleAccessor.reset(readAheadBuf);
-                    int tupleCount = runFrameTupleAccessor.getTupleCount();
-                    for (int j = 0; j < tupleCount; j++) {
-                        hhsTable.insert(runFrameTupleAccessor, j);
-                    }
-                }
-
-                recurRunReader.close();
-                hhsTable.finishup();
-
-                LinkedList<RunFileReader> hhsRuns = hhsTable.getRunFileReaders();
-
-                if (hhsRuns.isEmpty()) {
-                    hhsTable.flushHashtableToOutput(writer);
-                    hhsTable.close();
-                } else {
-                    hhsTable.close();
-                    HybridHashSortRunMerger hhsMerger = new HybridHashSortRunMerger(ctx, hhsRuns, storedKeyFields,
-                            comparators, recordDescriptors[0], tpcfMerge.createPartitioner(runLevel + 1),
-                            mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
-                                    storedKeyFields, storedKeyFields), framesLimit, tableSize, writer, false);
-                    hhsMerger.process();
-                }
-            }
-
-        };
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
deleted file mode 100644
index 5323887..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-public class HybridHashUtil {
-
-    /**
-     * Compute the expected number of spilling partitions (in-memory partition is not included), using the hybrid-hash
-     * algorithm from [Shapiro86]. Note that 0 means that there is no need to have spilling partitions.
-     * 
-     * @param inputSizeInFrames
-     * @param memorySizeInFrames
-     * @param fudgeFactor
-     * @return
-     */
-    public static int hybridHashPartitionComputer(int inputSizeOfUniqueKeysInFrames, int memorySizeInFrames,
-            double fudgeFactor) {
-        return Math.max(
-                (int) Math.ceil((inputSizeOfUniqueKeysInFrames * fudgeFactor - memorySizeInFrames)
-                        / (memorySizeInFrames - 1)), 0);
-    }
-
-    /**
-     * Compute the estimated number of unique keys in a partition of a dataset, using Yao's formula
-     * 
-     * @param inputSizeInRawRecords
-     * @param inputSizeInUniqueKeys
-     * @param numOfPartitions
-     * @return
-     */
-    public static long getEstimatedPartitionSizeOfUniqueKeys(long inputSizeInRawRecords, long inputSizeInUniqueKeys,
-            int numOfPartitions) {
-        if (numOfPartitions == 1) {
-            return inputSizeInUniqueKeys;
-        }
-        return (long) Math.ceil(inputSizeInUniqueKeys
-                * (1 - Math.pow(1 - ((double) inputSizeInRawRecords / (double) numOfPartitions)
-                        / (double) inputSizeInRawRecords, (double) inputSizeInRawRecords
-                        / (double) inputSizeInUniqueKeys)));
-    }
-}