Update issue #52:

Added new aggregator interfaces and group operator implementations, in order to merge the old interfaces.

A simple integer sum aggregator is added to show the usage of the new interfaces.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@860 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
new file mode 100644
index 0000000..b73cf46
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -0,0 +1,761 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+/**
+ *
+ */
+public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+    private static final int AGGREGATE_ACTIVITY_ID = 0;
+
+    private static final int MERGE_ACTIVITY_ID = 1;
+
+    private static final long serialVersionUID = 1L;
+    private final int[] keyFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+    private final IFieldAggregateDescriptorFactory[] aggregateFactories;
+    private final IFieldAggregateDescriptorFactory[] mergeFactories;
+    private final int framesLimit;
+    private final ISpillableTableFactory spillableTableFactory;
+    private final boolean isOutputSorted;
+
+    public ExternalGroupOperatorDescriptor(JobSpecification spec,
+            int[] keyFields, int framesLimit,
+            IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory firstNormalizerFactory,
+            IFieldAggregateDescriptorFactory[] aggregateFactories,
+            IFieldAggregateDescriptorFactory[] mergeFactories,
+            RecordDescriptor recordDescriptor,
+            ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+        super(spec, 1, 1);
+        this.framesLimit = framesLimit;
+        if (framesLimit <= 1) {
+            /**
+             * Minimum of 2 frames: 1 for input records, and 1 for output
+             * aggregation results.
+             */
+            throw new IllegalStateException(
+                    "frame limit should at least be 2, but it is "
+                            + framesLimit + "!");
+        }
+        this.aggregateFactories = aggregateFactories;
+        this.mergeFactories = mergeFactories;
+        this.keyFields = keyFields;
+        this.comparatorFactories = comparatorFactories;
+        this.firstNormalizerFactory = firstNormalizerFactory;
+        this.spillableTableFactory = spillableTableFactory;
+        this.isOutputSorted = isOutputSorted;
+
+        /**
+         * Set the record descriptor. Note that since this operator is a unary
+         * operator, only the first record descriptor is used here.
+         */
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+     */
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
+                getOperatorId(), AGGREGATE_ACTIVITY_ID));
+        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
+                MERGE_ACTIVITY_ID));
+
+        builder.addActivity(aggregateAct);
+        builder.addSourceEdge(0, aggregateAct, 0);
+
+        builder.addActivity(mergeAct);
+        builder.addTargetEdge(0, mergeAct, 0);
+
+        builder.addBlockingEdge(aggregateAct, mergeAct);
+    }
+
+    public static class AggregateActivityState extends AbstractTaskState {
+        private LinkedList<RunFileReader> runs;
+
+        private ISpillableTable gTable;
+
+        public AggregateActivityState() {
+        }
+
+        private AggregateActivityState(JobId jobId, TaskId tId) {
+            super(jobId, tId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class AggregateActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public AggregateActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider,
+                final int partition, int nPartitions)
+                throws HyracksDataException {
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(
+                    ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(
+                            getOperatorId(), 0));
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private AggregateActivityState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = new AggregateActivityState(ctx.getJobletContext()
+                            .getJobId(), new TaskId(getActivityId(), partition));
+                    state.runs = new LinkedList<RunFileReader>();
+                    state.gTable = spillableTableFactory.buildSpillableTable(
+                            ctx, keyFields, comparatorFactories,
+                            firstNormalizerFactory, aggregateFactories,
+                            recordDescProvider.getInputRecordDescriptor(
+                                    getOperatorId(), 0), recordDescriptors[0],
+                            ExternalGroupOperatorDescriptor.this.framesLimit);
+                    state.gTable.reset();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer)
+                        throws HyracksDataException {
+                    accessor.reset(buffer);
+                    int tupleCount = accessor.getTupleCount();
+                    for (int i = 0; i < tupleCount; i++) {
+                        /**
+                         * If the group table is too large, flush the table into
+                         * a run file.
+                         */
+                        if (!state.gTable.insert(accessor, i)) {
+                            flushFramesToRun();
+                            if (!state.gTable.insert(accessor, i))
+                                throw new HyracksDataException(
+                                        "Failed to insert a new buffer into the aggregate operator!");
+                        }
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    if (state.gTable.getFrameCount() >= 0) {
+                        if (state.runs.size() > 0) {
+                            /**
+                             * flush the memory into the run file.
+                             */
+                            flushFramesToRun();
+                            state.gTable.close();
+                            state.gTable = null;
+                        }
+                    }
+                    ctx.setTaskState(state);
+                }
+
+                private void flushFramesToRun() throws HyracksDataException {
+                    FileReference runFile;
+                    try {
+                        runFile = ctx.getJobletContext()
+                                .createManagedWorkspaceFile(
+                                        ExternalGroupOperatorDescriptor.class
+                                                .getSimpleName());
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
+                    RunFileWriter writer = new RunFileWriter(runFile,
+                            ctx.getIOManager());
+                    writer.open();
+                    try {
+                        state.gTable.sortFrames();
+                        state.gTable.flushFrames(writer, true);
+                    } catch (Exception ex) {
+                        throw new HyracksDataException(ex);
+                    } finally {
+                        writer.close();
+                    }
+                    state.gTable.reset();
+                    state.runs.add(((RunFileWriter) writer).createReader());
+                }
+            };
+            return op;
+        }
+    }
+
+    private class MergeActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MergeActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider,
+                final int partition, int nPartitions)
+                throws HyracksDataException {
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            for (int i = 0; i < comparatorFactories.length; ++i) {
+                comparators[i] = comparatorFactories[i]
+                        .createBinaryComparator();
+            }
+            final IFieldAggregateDescriptor[] currentWorkingAggregators = new IFieldAggregateDescriptor[mergeFactories.length];
+            final IAggregateState[] aggregateStates = new IAggregateState[mergeFactories.length];
+            for (int i = 0; i < currentWorkingAggregators.length; i++) {
+                currentWorkingAggregators[i] = mergeFactories[i]
+                        .createAggregator(ctx, recordDescriptors[0],
+                                recordDescriptors[0]);
+                aggregateStates[i] = currentWorkingAggregators[i].createState();
+            }
+            final int[] storedKeys = new int[keyFields.length];
+            /**
+             * Get the list of the fields in the stored records.
+             */
+            for (int i = 0; i < keyFields.length; ++i) {
+                storedKeys[i] = i;
+            }
+            /**
+             * Tuple builder
+             */
+            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+                    recordDescriptors[0].getFields().length);
+
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+                /**
+                 * Input frames, one for each run file.
+                 */
+                private List<ByteBuffer> inFrames;
+
+                /**
+                 * Output frame.
+                 */
+                private ByteBuffer outFrame, writerFrame;
+
+                private LinkedList<RunFileReader> runs;
+
+                private AggregateActivityState aggState;
+
+                /**
+                 * how many frames to be read ahead once
+                 */
+                private int runFrameLimit = 1;
+
+                private int[] currentFrameIndexInRun;
+                private int[] currentRunFrames;
+                private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
+                        ctx.getFrameSize());
+                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
+                        ctx.getFrameSize(), recordDescriptors[0]);
+                private ArrayTupleBuilder finalTupleBuilder;
+                private FrameTupleAppender writerFrameAppender;
+
+                public void initialize() throws HyracksDataException {
+                    aggState = (AggregateActivityState) ctx
+                            .getTaskState(new TaskId(new ActivityId(
+                                    getOperatorId(), AGGREGATE_ACTIVITY_ID),
+                                    partition));
+                    runs = aggState.runs;
+                    writer.open();
+                    try {
+                        if (runs.size() <= 0) {
+                            ISpillableTable gTable = aggState.gTable;
+                            if (gTable != null) {
+                                if (isOutputSorted)
+                                    gTable.sortFrames();
+                                gTable.flushFrames(writer, false);
+                            }
+                        } else {
+                            runs = new LinkedList<RunFileReader>(runs);
+                            inFrames = new ArrayList<ByteBuffer>();
+                            outFrame = ctx.allocateFrame();
+                            outFrameAppender.reset(outFrame, true);
+                            outFrameAccessor.reset(outFrame);
+                            while (runs.size() > 0) {
+                                try {
+                                    doPass(runs);
+                                } catch (Exception e) {
+                                    throw new HyracksDataException(e);
+                                }
+                            }
+                            inFrames.clear();
+                        }
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
+                    }
+                }
+
+                private void doPass(LinkedList<RunFileReader> runs)
+                        throws HyracksDataException {
+                    FileReference newRun = null;
+                    IFrameWriter writer = this.writer;
+                    boolean finalPass = false;
+
+                    while (inFrames.size() + 2 < framesLimit) {
+                        inFrames.add(ctx.allocateFrame());
+                    }
+                    int runNumber;
+                    if (runs.size() + 2 <= framesLimit) {
+                        finalPass = true;
+                        runFrameLimit = (framesLimit - 2) / runs.size();
+                        runNumber = runs.size();
+                    } else {
+                        runNumber = framesLimit - 2;
+                        newRun = ctx.getJobletContext()
+                                .createManagedWorkspaceFile(
+                                        ExternalGroupOperatorDescriptor.class
+                                                .getSimpleName());
+                        writer = new RunFileWriter(newRun, ctx.getIOManager());
+                        writer.open();
+                    }
+                    try {
+                        currentFrameIndexInRun = new int[runNumber];
+                        currentRunFrames = new int[runNumber];
+                        /**
+                         * Create file readers for each input run file, only for
+                         * the ones fit into the inFrames
+                         */
+                        RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
+                                .size()];
+                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
+                                ctx.getFrameSize(), recordDescriptors[0],
+                                runNumber, comparator);
+                        /**
+                         * current tuple index in each run
+                         */
+                        int[] tupleIndices = new int[runNumber];
+
+                        for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
+                            tupleIndices[runIndex] = 0;
+                            // Load the run file
+                            runFileReaders[runIndex] = runs.get(runIndex);
+                            runFileReaders[runIndex].open();
+
+                            currentRunFrames[runIndex] = 0;
+                            currentFrameIndexInRun[runIndex] = runIndex
+                                    * runFrameLimit;
+                            for (int j = 0; j < runFrameLimit; j++) {
+                                int frameIndex = currentFrameIndexInRun[runIndex]
+                                        + j;
+                                if (runFileReaders[runIndex].nextFrame(inFrames
+                                        .get(frameIndex))) {
+                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(
+                                            ctx.getFrameSize(),
+                                            recordDescriptors[0]);
+                                    tupleAccessors[frameIndex].reset(inFrames
+                                            .get(frameIndex));
+                                    currentRunFrames[runIndex]++;
+                                    if (j == 0)
+                                        setNextTopTuple(runIndex, tupleIndices,
+                                                runFileReaders, tupleAccessors,
+                                                topTuples);
+                                } else {
+                                    break;
+                                }
+                            }
+                        }
+
+                        /**
+                         * Start merging
+                         */
+                        while (!topTuples.areRunsExhausted()) {
+                            /**
+                             * Get the top record
+                             */
+                            ReferenceEntry top = topTuples.peek();
+                            int tupleIndex = top.getTupleIndex();
+                            int runIndex = topTuples.peek().getRunid();
+                            FrameTupleAccessor fta = top.getAccessor();
+
+                            int currentTupleInOutFrame = outFrameAccessor
+                                    .getTupleCount() - 1;
+                            if (currentTupleInOutFrame < 0
+                                    || compareFrameTuples(fta, tupleIndex,
+                                            outFrameAccessor,
+                                            currentTupleInOutFrame) != 0) {
+                                /**
+                                 * Initialize the first output record Reset the
+                                 * tuple builder
+                                 */
+                                tupleBuilder.reset();
+                                for (int i = 0; i < keyFields.length; i++) {
+                                    tupleBuilder.addField(fta, tupleIndex, i);
+                                }
+                                for (int i = 0; i < currentWorkingAggregators.length; i++) {
+                                    currentWorkingAggregators[i].init(fta,
+                                            tupleIndex,
+                                            tupleBuilder.getDataOutput(),
+                                            aggregateStates[i]);
+                                    tupleBuilder.addFieldEndOffset();
+                                }
+                                if (!outFrameAppender.append(
+                                        tupleBuilder.getFieldEndOffsets(),
+                                        tupleBuilder.getByteArray(), 0,
+                                        tupleBuilder.getSize())) {
+                                    flushOutFrame(writer, finalPass);
+                                    if (!outFrameAppender.append(
+                                            tupleBuilder.getFieldEndOffsets(),
+                                            tupleBuilder.getByteArray(), 0,
+                                            tupleBuilder.getSize()))
+                                        throw new HyracksDataException(
+                                                "Failed to append an aggregation result to the output frame.");
+                                }
+                            } else {
+                                /**
+                                 * if new tuple is in the same group of the
+                                 * current aggregator do merge and output to the
+                                 * outFrame
+                                 */
+                                int tupleOffset = outFrameAccessor
+                                        .getTupleStartOffset(currentTupleInOutFrame);
+                                int fieldOffset = outFrameAccessor
+                                        .getFieldStartOffset(
+                                                currentTupleInOutFrame,
+                                                keyFields.length);
+                                for (int i = 0; i < currentWorkingAggregators.length; i++) {
+                                    currentWorkingAggregators[i]
+                                            .aggregate(
+                                                    fta,
+                                                    tupleIndex,
+                                                    outFrameAccessor
+                                                            .getBuffer()
+                                                            .array(),
+                                                    tupleOffset
+                                                            + outFrameAccessor
+                                                                    .getFieldSlotsLength()
+                                                            + fieldOffset,
+                                                    aggregateStates[i]);
+                                }
+                            }
+                            tupleIndices[runIndex]++;
+                            setNextTopTuple(runIndex, tupleIndices,
+                                    runFileReaders, tupleAccessors, topTuples);
+                        }
+
+                        if (outFrameAppender.getTupleCount() > 0) {
+                            flushOutFrame(writer, finalPass);
+                        }
+
+                        for (int i = 0; i < currentWorkingAggregators.length; i++) {
+                            currentWorkingAggregators[i].close();
+                        }
+                        runs.subList(0, runNumber).clear();
+                        /**
+                         * insert the new run file into the beginning of the run
+                         * file list
+                         */
+                        if (!finalPass) {
+                            runs.add(0, ((RunFileWriter) writer).createReader());
+                        }
+                    } finally {
+                        if (!finalPass) {
+                            writer.close();
+                        }
+                    }
+                }
+
+                private void flushOutFrame(IFrameWriter writer, boolean isFinal)
+                        throws HyracksDataException {
+                    if (finalTupleBuilder == null) {
+                        finalTupleBuilder = new ArrayTupleBuilder(
+                                recordDescriptors[0].getFields().length);
+                    }
+                    if (writerFrame == null) {
+                        writerFrame = ctx.allocateFrame();
+                    }
+                    if (writerFrameAppender == null) {
+                        writerFrameAppender = new FrameTupleAppender(
+                                ctx.getFrameSize());
+                        writerFrameAppender.reset(writerFrame, true);
+                    }
+                    outFrameAccessor.reset(outFrame);
+
+                    for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+                        int tupleOffset = outFrameAccessor
+                                .getTupleStartOffset(i);
+                        finalTupleBuilder.reset();
+                        for (int j = 0; j < keyFields.length; j++) {
+                            finalTupleBuilder.addField(outFrameAccessor, i, j);
+                        }
+                        for (int j = 0; j < currentWorkingAggregators.length; j++) {
+                            int fieldOffset = outFrameAccessor
+                                    .getFieldStartOffset(i, keyFields.length
+                                            + j);
+                            if (isFinal)
+                                currentWorkingAggregators[j].outputFinalResult(
+                                        finalTupleBuilder.getDataOutput(),
+                                        outFrameAccessor.getBuffer().array(),
+                                        tupleOffset
+                                                + outFrameAccessor
+                                                        .getFieldSlotsLength()
+                                                + fieldOffset,
+                                        aggregateStates[j]);
+                            else
+                                currentWorkingAggregators[j]
+                                        .outputPartialResult(
+                                                finalTupleBuilder
+                                                        .getDataOutput(),
+                                                outFrameAccessor.getBuffer()
+                                                        .array(),
+                                                tupleOffset
+                                                        + outFrameAccessor
+                                                                .getFieldSlotsLength()
+                                                        + fieldOffset,
+                                                aggregateStates[j]);
+                            finalTupleBuilder.addFieldEndOffset();
+                        }
+
+                        if (!writerFrameAppender.append(
+                                finalTupleBuilder.getFieldEndOffsets(),
+                                finalTupleBuilder.getByteArray(), 0,
+                                finalTupleBuilder.getSize())) {
+                            FrameUtils.flushFrame(writerFrame, writer);
+                            writerFrameAppender.reset(writerFrame, true);
+                            if (!writerFrameAppender.append(
+                                    finalTupleBuilder.getFieldEndOffsets(),
+                                    finalTupleBuilder.getByteArray(), 0,
+                                    finalTupleBuilder.getSize()))
+                                throw new HyracksDataException(
+                                        "Failed to write final aggregation result to a writer frame!");
+                        }
+                    }
+                    if (writerFrameAppender.getTupleCount() > 0) {
+                        FrameUtils.flushFrame(writerFrame, writer);
+                        writerFrameAppender.reset(writerFrame, true);
+                    }
+                    outFrameAppender.reset(outFrame, true);
+                }
+
+                private void setNextTopTuple(int runIndex, int[] tupleIndices,
+                        RunFileReader[] runCursors,
+                        FrameTupleAccessor[] tupleAccessors,
+                        ReferencedPriorityQueue topTuples)
+                        throws HyracksDataException {
+                    int runStart = runIndex * runFrameLimit;
+                    boolean existNext = false;
+                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
+                            || runCursors[runIndex] == null) {
+                        /**
+                         * run already closed
+                         */
+                        existNext = false;
+                    } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+                        /**
+                         * not the last frame for this run
+                         */
+                        existNext = true;
+                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
+                                .getTupleCount()) {
+                            tupleIndices[runIndex] = 0;
+                            currentFrameIndexInRun[runIndex]++;
+                        }
+                    } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
+                            .getTupleCount()) {
+                        /**
+                         * the last frame has expired
+                         */
+                        existNext = true;
+                    } else {
+                        /**
+                         * If all tuples in the targeting frame have been
+                         * checked.
+                         */
+                        int frameOffset = runIndex * runFrameLimit;
+                        tupleIndices[runIndex] = 0;
+                        currentFrameIndexInRun[runIndex] = frameOffset;
+                        /**
+                         * read in batch
+                         */
+                        currentRunFrames[runIndex] = 0;
+                        for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
+                            ByteBuffer buffer = tupleAccessors[frameOffset]
+                                    .getBuffer();
+                            if (runCursors[runIndex].nextFrame(buffer)) {
+                                tupleAccessors[frameOffset].reset(buffer);
+                                if (tupleAccessors[frameOffset].getTupleCount() > 0) {
+                                    existNext = true;
+                                } else {
+                                    throw new IllegalStateException(
+                                            "illegal: empty run file");
+                                }
+                                currentRunFrames[runIndex]++;
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+
+                    if (existNext) {
+                        topTuples
+                                .popAndReplace(
+                                        tupleAccessors[currentFrameIndexInRun[runIndex]],
+                                        tupleIndices[runIndex]);
+                    } else {
+                        topTuples.pop();
+                        closeRun(runIndex, runCursors, tupleAccessors);
+                    }
+                }
+
+                /**
+                 * Close the run file, and also the corresponding readers and
+                 * input frame.
+                 * 
+                 * @param index
+                 * @param runCursors
+                 * @param tupleAccessor
+                 * @throws HyracksDataException
+                 */
+                private void closeRun(int index, RunFileReader[] runCursors,
+                        IFrameTupleAccessor[] tupleAccessor)
+                        throws HyracksDataException {
+                    if (runCursors[index] != null) {
+                        runCursors[index].close();
+                        runCursors[index] = null;
+                    }
+                }
+
+                private int compareFrameTuples(IFrameTupleAccessor fta1,
+                        int j1, IFrameTupleAccessor fta2, int j2) {
+                    byte[] b1 = fta1.getBuffer().array();
+                    byte[] b2 = fta2.getBuffer().array();
+                    for (int f = 0; f < keyFields.length; ++f) {
+                        int fIdx = f;
+                        int s1 = fta1.getTupleStartOffset(j1)
+                                + fta1.getFieldSlotsLength()
+                                + fta1.getFieldStartOffset(j1, fIdx);
+                        int l1 = fta1.getFieldLength(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2)
+                                + fta2.getFieldSlotsLength()
+                                + fta2.getFieldStartOffset(j2, fIdx);
+                        int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+                        int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+                        int l2 = l2_end - l2_start;
+                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                        if (c != 0) {
+                            return c;
+                        }
+                    }
+                    return 0;
+                }
+            };
+            return op;
+        }
+
+        private Comparator<ReferenceEntry> createEntryComparator(
+                final IBinaryComparator[] comparators) {
+            return new Comparator<ReferenceEntry>() {
+
+                @Override
+                public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
+                            .getAccessor();
+                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
+                            .getAccessor();
+                    int j1 = o1.getTupleIndex();
+                    int j2 = o2.getTupleIndex();
+                    byte[] b1 = fta1.getBuffer().array();
+                    byte[] b2 = fta2.getBuffer().array();
+                    for (int f = 0; f < keyFields.length; ++f) {
+                        int fIdx = f;
+                        int s1 = fta1.getTupleStartOffset(j1)
+                                + fta1.getFieldSlotsLength()
+                                + fta1.getFieldStartOffset(j1, fIdx);
+                        int l1 = fta1.getFieldEndOffset(j1, fIdx)
+                                - fta1.getFieldStartOffset(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2)
+                                + fta2.getFieldSlotsLength()
+                                + fta2.getFieldStartOffset(j2, fIdx);
+                        int l2 = fta2.getFieldEndOffset(j2, fIdx)
+                                - fta2.getFieldStartOffset(j2, fIdx);
+                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                        if (c != 0) {
+                            return c;
+                        }
+                    }
+                    return 0;
+                }
+
+            };
+        }
+
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
new file mode 100644
index 0000000..bdcc2e9
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+
+class GroupingHashTable {
+	/**
+	 * The pointers in the link store 3 int values for each entry in the
+	 * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+	 * 
+	 * @author vinayakb
+	 */
+	private static class Link {
+		private static final int INIT_POINTERS_SIZE = 9;
+
+		int[] pointers;
+		int size;
+
+		Link() {
+			pointers = new int[INIT_POINTERS_SIZE];
+			size = 0;
+		}
+
+		void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+			while (size + 3 > pointers.length) {
+				pointers = Arrays.copyOf(pointers, pointers.length * 2);
+			}
+			pointers[size++] = bufferIdx;
+			pointers[size++] = tIndex;
+			pointers[size++] = accumulatorIdx;
+		}
+	}
+
+	private static final int INIT_ACCUMULATORS_SIZE = 8;
+	private final IHyracksTaskContext ctx;
+	private final FrameTupleAppender appender;
+	private final List<ByteBuffer> buffers;
+	private final Link[] table;
+	private IAggregateState[][] aggregateStates;
+	private int accumulatorSize;
+
+	private int lastBIndex;
+	private final int[] fields;
+	private final int[] storedKeys;
+	private final IBinaryComparator[] comparators;
+	private final FrameTuplePairComparator ftpc;
+	private final ITuplePartitionComputer tpc;
+	private final IFieldAggregateDescriptor[] aggregators;
+	private final RecordDescriptor inRecordDescriptor;
+	private final RecordDescriptor outRecordDescriptor;
+
+	private final FrameTupleAccessor storedKeysAccessor;
+
+	GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
+			IBinaryComparatorFactory[] comparatorFactories,
+			ITuplePartitionComputerFactory tpcf,
+			IFieldAggregateDescriptorFactory[] aggregatorFactories,
+			RecordDescriptor inRecordDescriptor,
+			RecordDescriptor outRecordDescriptor, int tableSize)
+			throws HyracksDataException {
+		this.ctx = ctx;
+		appender = new FrameTupleAppender(ctx.getFrameSize());
+		buffers = new ArrayList<ByteBuffer>();
+		table = new Link[tableSize];
+		
+		this.aggregateStates = new IAggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
+		accumulatorSize = 0;
+		
+		this.fields = fields;
+		storedKeys = new int[fields.length];
+		@SuppressWarnings("rawtypes")
+		ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
+		for (int i = 0; i < fields.length; ++i) {
+			storedKeys[i] = i;
+			storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+		}
+		
+		comparators = new IBinaryComparator[comparatorFactories.length];
+		for (int i = 0; i < comparatorFactories.length; ++i) {
+			comparators[i] = comparatorFactories[i].createBinaryComparator();
+		}
+		ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
+		tpc = tpcf.createPartitioner();
+		
+		this.inRecordDescriptor = inRecordDescriptor;
+		this.outRecordDescriptor = outRecordDescriptor;
+		
+		this.aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
+		for (int i = 0; i < aggregatorFactories.length; i++) {
+			this.aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
+					this.inRecordDescriptor, this.outRecordDescriptor);
+		}
+		RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
+				storedKeySerDeser);
+		storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+				storedKeysRecordDescriptor);
+		lastBIndex = -1;
+		addNewBuffer();
+	}
+
+	private void addNewBuffer() {
+		ByteBuffer buffer = ctx.allocateFrame();
+		buffer.position(0);
+		buffer.limit(buffer.capacity());
+		buffers.add(buffer);
+		appender.reset(buffer, true);
+		++lastBIndex;
+	}
+
+	private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
+			throws HyracksDataException {
+		ByteBuffer frame = appender.getBuffer();
+		frame.position(0);
+		frame.limit(frame.capacity());
+		writer.nextFrame(appender.getBuffer());
+		appender.reset(appender.getBuffer(), true);
+	}
+
+	void insert(FrameTupleAccessor accessor, int tIndex)
+			throws Exception {
+		int entry = tpc.partition(accessor, tIndex, table.length);
+		Link link = table[entry];
+		if (link == null) {
+			link = table[entry] = new Link();
+		}
+		int saIndex = -1;
+		for (int i = 0; i < link.size; i += 3) {
+			int sbIndex = link.pointers[i];
+			int stIndex = link.pointers[i + 1];
+			storedKeysAccessor.reset(buffers.get(sbIndex));
+			int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
+			if (c == 0) {
+				saIndex = link.pointers[i + 2];
+				break;
+			}
+		}
+		if (saIndex < 0) {
+			// Did not find the key. Insert a new entry.
+			saIndex = accumulatorSize++;
+			if (!appender.appendProjection(accessor, tIndex, fields)) {
+				addNewBuffer();
+				if (!appender.appendProjection(accessor, tIndex, fields)) {
+					throw new IllegalStateException();
+				}
+			}
+			int sbIndex = lastBIndex;
+			int stIndex = appender.getTupleCount() - 1;
+			for (int i = 0; i < aggregators.length; i++) {
+				IAggregateState aggState = aggregators[i].createState();
+				aggregators[i].init(accessor, tIndex, null, aggState);
+				if (saIndex >= aggregateStates[i].length) {
+					aggregateStates[i] = Arrays.copyOf(aggregateStates[i],
+							aggregateStates[i].length * 2);
+				}
+				aggregateStates[i][saIndex] = aggState;
+			}
+			link.add(sbIndex, stIndex, saIndex);
+		} else {
+			for (int i = 0; i < aggregators.length; i++) {
+				aggregators[i].aggregate(accessor, tIndex, null, 0,
+						aggregateStates[i][saIndex]);
+			}
+		}
+	}
+
+	void write(IFrameWriter writer) throws HyracksDataException {
+		ByteBuffer buffer = ctx.allocateFrame();
+		appender.reset(buffer, true);
+		ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+				outRecordDescriptor.getFields().length);
+		for (int i = 0; i < table.length; ++i) {
+			Link link = table[i];
+			if (link != null) {
+				for (int j = 0; j < link.size; j += 3) {
+					int bIndex = link.pointers[j];
+					int tIndex = link.pointers[j + 1];
+					int aIndex = link.pointers[j + 2];
+					ByteBuffer keyBuffer = buffers.get(bIndex);
+					storedKeysAccessor.reset(keyBuffer);
+
+					tupleBuilder.reset();
+					for (int k : this.fields) {
+						tupleBuilder.addField(storedKeysAccessor, tIndex, k);
+					}
+					for (int k = 0; k < aggregators.length; k++) {
+						aggregators[k].outputFinalResult(
+								tupleBuilder.getDataOutput(), null, 0,
+								aggregateStates[k][aIndex]);
+						tupleBuilder.addFieldEndOffset();
+					}
+					while (!appender.append(tupleBuilder.getFieldEndOffsets(),
+							tupleBuilder.getByteArray(), 0,
+							tupleBuilder.getSize())) {
+						flushFrame(appender, writer);
+					}
+				}
+			}
+		}
+		if (appender.getTupleCount() != 0) {
+			flushFrame(appender, writer);
+		}
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
new file mode 100644
index 0000000..1b46bdd
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ *
+ */
+public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+    private static final int HASH_BUILD_ACTIVITY_ID = 0;
+
+    private static final int OUTPUT_ACTIVITY_ID = 1;
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] keys;
+    private final ITuplePartitionComputerFactory tpcf;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+
+    private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+
+    private final int tableSize;
+
+    public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
+            ITuplePartitionComputerFactory tpcf,
+            IBinaryComparatorFactory[] comparatorFactories,
+            IFieldAggregateDescriptorFactory[] aggregatorFactories,
+            RecordDescriptor outRecordDescriptor, int tableSize) {
+        super(spec, 1, 1);
+        this.keys = keys;
+        this.tpcf = tpcf;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactories = aggregatorFactories;
+        recordDescriptors[0] = outRecordDescriptor;
+        this.tableSize = tableSize;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+     */
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
+                HASH_BUILD_ACTIVITY_ID));
+        builder.addActivity(ha);
+
+        OutputActivity oa = new OutputActivity(new ActivityId(odId,
+                OUTPUT_ACTIVITY_ID));
+        builder.addActivity(oa);
+
+        builder.addSourceEdge(0, ha, 0);
+        builder.addTargetEdge(0, oa, 0);
+        builder.addBlockingEdge(ha, oa);
+    }
+
+    public static class HashBuildActivityState extends AbstractTaskState {
+        private GroupingHashTable table;
+
+        public HashBuildActivityState() {
+        }
+
+        private HashBuildActivityState(JobId jobId, TaskId tId) {
+            super(jobId, tId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
+    private class HashBuildActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public HashBuildActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider,
+                final int partition, int nPartitions) {
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(
+                    ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(
+                            getOperatorId(), 0));
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
+                private HashBuildActivityState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = new HashBuildActivityState(ctx.getJobletContext()
+                            .getJobId(), new TaskId(getActivityId(), partition));
+                    state.table = new GroupingHashTable(ctx, keys,
+                            comparatorFactories, tpcf, aggregatorFactories,
+                            recordDescProvider.getInputRecordDescriptor(
+                                    getOperatorId(), 0), recordDescriptors[0],
+                            tableSize);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer)
+                        throws HyracksDataException {
+                    accessor.reset(buffer);
+                    int tupleCount = accessor.getTupleCount();
+                    for (int i = 0; i < tupleCount; ++i) {
+                        try {
+                            state.table.insert(accessor, i);
+                        } catch (Exception e) {
+                            System.out.println(e.toString());
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    ctx.setTaskState(state);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    throw new HyracksDataException(
+                            "HashGroupOperator is failed.");
+                }
+            };
+        }
+    }
+
+    private class OutputActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public OutputActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider,
+                final int partition, int nPartitions) {
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
+                @Override
+                public void initialize() throws HyracksDataException {
+                    HashBuildActivityState buildState = (HashBuildActivityState) ctx
+                            .getTaskState(new TaskId(new ActivityId(
+                                    getOperatorId(), HASH_BUILD_ACTIVITY_ID),
+                                    partition));
+                    GroupingHashTable table = buildState.table;
+                    writer.open();
+                    try {
+                        table.write(writer);
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
+                    }
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
new file mode 100644
index 0000000..332e63e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -0,0 +1,589 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ *
+ */
+public class HashSpillableTableFactory implements ISpillableTableFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final ITuplePartitionComputerFactory tpcf;
+    private final int tableSize;
+
+    public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf,
+            int tableSize) {
+        this.tpcf = tpcf;
+        this.tableSize = tableSize;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
+     * buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
+     * edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
+     * edu.
+     * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
+     * [], edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
+     */
+    @Override
+    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx,
+            final int[] keyFields,
+            IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IFieldAggregateDescriptorFactory[] aggregatorFactories,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, final int framesLimit)
+            throws HyracksDataException {
+        final int[] storedKeys = new int[keyFields.length];
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
+        for (int i = 0; i < keyFields.length; i++) {
+            storedKeys[i] = i;
+            storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
+        }
+
+        RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
+        final FrameTupleAccessor storedKeysAccessor1;
+        final FrameTupleAccessor storedKeysAccessor2;
+        if (keyFields.length >= outRecordDescriptor.getFields().length) {
+            // for the case of zero-aggregations
+            ISerializerDeserializer<?>[] fields = outRecordDescriptor
+                    .getFields();
+            ITypeTrait[] types = outRecordDescriptor.getTypeTraits();
+            ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+            for (int i = 0; i < fields.length; i++)
+                newFields[i] = fields[i];
+            ITypeTrait[] newTypes = null;
+            if (types != null) {
+                newTypes = new ITypeTrait[types.length + 1];
+                for (int i = 0; i < types.length; i++)
+                    newTypes[i] = types[i];
+            }
+            internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+        }
+        storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
+                internalRecordDescriptor);
+        storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
+                internalRecordDescriptor);
+
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+
+        final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(
+                keyFields, storedKeys, comparators);
+        final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
+                storedKeys, storedKeys, comparators);
+        final FrameTupleAppender appender = new FrameTupleAppender(
+                ctx.getFrameSize());
+        final ITuplePartitionComputer tpc = tpcf.createPartitioner();
+        final ByteBuffer outFrame = ctx.allocateFrame();
+
+        final ArrayTupleBuilder internalTupleBuilder;
+        if (keyFields.length < outRecordDescriptor.getFields().length)
+            internalTupleBuilder = new ArrayTupleBuilder(
+                    outRecordDescriptor.getFields().length);
+        else
+            internalTupleBuilder = new ArrayTupleBuilder(
+                    outRecordDescriptor.getFields().length + 1);
+        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
+                outRecordDescriptor.getFields().length);
+        final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
+                : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+        final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
+        final IAggregateState[] aggregateStates = new IAggregateState[aggregatorFactories.length];
+        for (int i = 0; i < aggregators.length; i++) {
+            aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
+                    inRecordDescriptor, outRecordDescriptor);
+            aggregateStates[i] = aggregators[i].createState();
+        }
+
+        return new ISpillableTable() {
+
+            private int dataFrameCount;
+            private final ISerializableTable table = new SerializableHashTable(
+                    tableSize, ctx);
+            private final TuplePointer storedTuplePointer = new TuplePointer();
+            private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+
+            /**
+             * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
+             * = Frame index in the "Frames" list, [1] = Tuple index in the
+             * frame, [2] = Poor man's normalized key for the tuple.
+             */
+            private int[] tPointers;
+
+            @Override
+            public void sortFrames() {
+                int sfIdx = storedKeys[0];
+                int totalTCount = table.getTupleCount();
+                tPointers = new int[totalTCount * 3];
+                int ptr = 0;
+
+                for (int i = 0; i < tableSize; i++) {
+                    int entry = i;
+                    int offset = 0;
+                    do {
+                        table.getTuplePointer(entry, offset, storedTuplePointer);
+                        if (storedTuplePointer.frameIndex < 0)
+                            break;
+                        tPointers[ptr * 3] = entry;
+                        tPointers[ptr * 3 + 1] = offset;
+                        table.getTuplePointer(entry, offset, storedTuplePointer);
+                        int fIndex = storedTuplePointer.frameIndex;
+                        int tIndex = storedTuplePointer.tupleIndex;
+                        storedKeysAccessor1.reset(frames.get(fIndex));
+                        int tStart = storedKeysAccessor1
+                                .getTupleStartOffset(tIndex);
+                        int f0StartRel = storedKeysAccessor1
+                                .getFieldStartOffset(tIndex, sfIdx);
+                        int f0EndRel = storedKeysAccessor1.getFieldEndOffset(
+                                tIndex, sfIdx);
+                        int f0Start = f0StartRel + tStart
+                                + storedKeysAccessor1.getFieldSlotsLength();
+                        tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc
+                                .normalize(storedKeysAccessor1.getBuffer()
+                                        .array(), f0Start, f0EndRel
+                                        - f0StartRel);
+                        ptr++;
+                        offset++;
+                    } while (true);
+                }
+                /**
+                 * Sort using quick sort
+                 */
+                if (tPointers.length > 0) {
+                    sort(tPointers, 0, totalTCount);
+                }
+            }
+
+            @Override
+            public void reset() {
+                dataFrameCount = -1;
+                tPointers = null;
+                table.reset();
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].close();
+                }
+            }
+
+            @Override
+            public boolean insert(FrameTupleAccessor accessor, int tIndex)
+                    throws HyracksDataException {
+                if (dataFrameCount < 0)
+                    nextAvailableFrame();
+                int entry = tpc.partition(accessor, tIndex, tableSize);
+                boolean foundGroup = false;
+                int offset = 0;
+                do {
+                    table.getTuplePointer(entry, offset++, storedTuplePointer);
+                    if (storedTuplePointer.frameIndex < 0)
+                        break;
+                    storedKeysAccessor1.reset(frames
+                            .get(storedTuplePointer.frameIndex));
+                    int c = ftpcPartial.compare(accessor, tIndex,
+                            storedKeysAccessor1, storedTuplePointer.tupleIndex);
+                    if (c == 0) {
+                        foundGroup = true;
+                        break;
+                    }
+                } while (true);
+
+                if (!foundGroup) {
+                    /**
+                     * If no matching group is found, create a new aggregator
+                     * Create a tuple for the new group
+                     */
+                    internalTupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        internalTupleBuilder.addField(accessor, tIndex,
+                                keyFields[i]);
+                    }
+                    for (int i = 0; i < aggregators.length; i++) {
+                        aggregators[i].init(accessor, tIndex,
+                                internalTupleBuilder.getDataOutput(),
+                                aggregateStates[i]);
+                        internalTupleBuilder.addFieldEndOffset();
+                    }
+                    if (!appender.append(
+                            internalTupleBuilder.getFieldEndOffsets(),
+                            internalTupleBuilder.getByteArray(), 0,
+                            internalTupleBuilder.getSize())) {
+                        if (!nextAvailableFrame()) {
+                            return false;
+                        } else {
+                            if (!appender.append(
+                                    internalTupleBuilder.getFieldEndOffsets(),
+                                    internalTupleBuilder.getByteArray(), 0,
+                                    internalTupleBuilder.getSize())) {
+                                throw new IllegalStateException(
+                                        "Failed to init an aggregator");
+                            }
+                        }
+                    }
+
+                    storedTuplePointer.frameIndex = dataFrameCount;
+                    storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
+                    table.insert(entry, storedTuplePointer);
+                } else {
+                    // If there is a matching found, do aggregation directly
+                    int tupleOffset = storedKeysAccessor1
+                            .getTupleStartOffset(storedTuplePointer.tupleIndex);
+
+                    for (int i = 0; i < aggregators.length; i++) {
+                        int aggFieldOffset = storedKeysAccessor1
+                                .getFieldStartOffset(
+                                        storedTuplePointer.tupleIndex,
+                                        keyFields.length + i);
+                        aggregators[i].aggregate(
+                                accessor,
+                                tIndex,
+                                storedKeysAccessor1.getBuffer().array(),
+                                tupleOffset
+                                        + storedKeysAccessor1
+                                                .getFieldSlotsLength()
+                                        + aggFieldOffset, aggregateStates[i]);
+                    }
+                }
+                return true;
+            }
+
+            @Override
+            public List<ByteBuffer> getFrames() {
+                return frames;
+            }
+
+            @Override
+            public int getFrameCount() {
+                return dataFrameCount;
+            }
+
+            @Override
+            public void flushFrames(IFrameWriter writer, boolean isPartial)
+                    throws HyracksDataException {
+                FrameTupleAppender appender = new FrameTupleAppender(
+                        ctx.getFrameSize());
+                writer.open();
+                appender.reset(outFrame, true);
+                if (tPointers == null) {
+                    // Not sorted
+                    for (int i = 0; i < tableSize; ++i) {
+                        int entry = i;
+                        int offset = 0;
+                        do {
+                            table.getTuplePointer(entry, offset++,
+                                    storedTuplePointer);
+                            if (storedTuplePointer.frameIndex < 0)
+                                break;
+                            int bIndex = storedTuplePointer.frameIndex;
+                            int tIndex = storedTuplePointer.tupleIndex;
+
+                            storedKeysAccessor1.reset(frames.get(bIndex));
+                            int tupleOffset = storedKeysAccessor1
+                                    .getTupleStartOffset(tIndex);
+                            // Reset the tuple for the partial result
+                            outputTupleBuilder.reset();
+                            for (int k = 0; k < keyFields.length; k++) {
+                                outputTupleBuilder.addField(
+                                        storedKeysAccessor1, tIndex, k);
+                            }
+                            for (int k = 0; k < aggregators.length; k++) {
+                                int fieldStart = storedKeysAccessor1
+                                        .getFieldStartOffset(tIndex,
+                                                keyFields.length + k);
+                                if (isPartial)
+                                    aggregators[k]
+                                            .outputPartialResult(
+                                                    outputTupleBuilder
+                                                            .getDataOutput(),
+                                                    storedKeysAccessor1
+                                                            .getBuffer()
+                                                            .array(),
+                                                    tupleOffset
+                                                            + storedKeysAccessor1
+                                                                    .getFieldSlotsLength()
+                                                            + fieldStart,
+                                                    aggregateStates[k]);
+                                else
+                                    aggregators[k]
+                                            .outputFinalResult(
+                                                    outputTupleBuilder
+                                                            .getDataOutput(),
+                                                    storedKeysAccessor1
+                                                            .getBuffer()
+                                                            .array(),
+                                                    tupleOffset
+                                                            + storedKeysAccessor1
+                                                                    .getFieldSlotsLength()
+                                                            + fieldStart,
+                                                    aggregateStates[k]);
+                                outputTupleBuilder.addFieldEndOffset();
+                            }
+
+                            while (!appender.append(
+                                    outputTupleBuilder.getFieldEndOffsets(),
+                                    outputTupleBuilder.getByteArray(), 0,
+                                    outputTupleBuilder.getSize())) {
+                                FrameUtils.flushFrame(outFrame, writer);
+                                appender.reset(outFrame, true);
+                            }
+                        } while (true);
+                    }
+                    if (appender.getTupleCount() != 0) {
+                        FrameUtils.flushFrame(outFrame, writer);
+                    }
+                    for (int i = 0; i < aggregators.length; i++) {
+                        aggregators[i].close();
+                    }
+                    return;
+                }
+                int n = tPointers.length / 3;
+                for (int ptr = 0; ptr < n; ptr++) {
+                    int tableIndex = tPointers[ptr * 3];
+                    int rowIndex = tPointers[ptr * 3 + 1];
+                    table.getTuplePointer(tableIndex, rowIndex,
+                            storedTuplePointer);
+                    int frameIndex = storedTuplePointer.frameIndex;
+                    int tupleIndex = storedTuplePointer.tupleIndex;
+                    // Get the frame containing the value
+                    ByteBuffer buffer = frames.get(frameIndex);
+                    storedKeysAccessor1.reset(buffer);
+
+                    int tupleOffset = storedKeysAccessor1
+                            .getTupleStartOffset(tupleIndex);
+
+                    outputTupleBuilder.reset();
+                    for (int k = 0; k < keyFields.length; k++) {
+                        outputTupleBuilder.addField(storedKeysAccessor1,
+                                tupleIndex, k);
+                    }
+                    for (int k = 0; k < aggregators.length; k++) {
+                        int fieldStart = storedKeysAccessor1
+                                .getFieldStartOffset(tupleIndex,
+                                        keyFields.length + k);
+                        if (isPartial)
+                            aggregators[k].outputPartialResult(
+                                    outputTupleBuilder.getDataOutput(),
+                                    storedKeysAccessor1.getBuffer().array(),
+                                    tupleOffset
+                                            + storedKeysAccessor1
+                                                    .getFieldSlotsLength()
+                                            + fieldStart, aggregateStates[k]);
+                        else
+                            aggregators[k].outputFinalResult(outputTupleBuilder
+                                    .getDataOutput(), storedKeysAccessor1
+                                    .getBuffer().array(), tupleOffset
+                                    + storedKeysAccessor1.getFieldSlotsLength()
+                                    + fieldStart, aggregateStates[k]);
+                        outputTupleBuilder.addFieldEndOffset();
+                    }
+                    if (!appender.append(
+                            outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0,
+                            outputTupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outFrame, writer);
+                        appender.reset(outFrame, true);
+                        if (!appender.append(
+                                outputTupleBuilder.getFieldEndOffsets(),
+                                outputTupleBuilder.getByteArray(), 0,
+                                outputTupleBuilder.getSize())) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+                if (appender.getTupleCount() > 0) {
+                    FrameUtils.flushFrame(outFrame, writer);
+                }
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].close();
+                }
+            }
+
+            @Override
+            public void close() {
+                dataFrameCount = -1;
+                tPointers = null;
+                table.close();
+                frames.clear();
+            }
+
+            /**
+             * Set the working frame to the next available frame in the frame
+             * list. There are two cases:<br>
+             * 1) If the next frame is not initialized, allocate a new frame. 2)
+             * When frames are already created, they are recycled.
+             * 
+             * @return Whether a new frame is added successfully.
+             */
+            private boolean nextAvailableFrame() {
+                // Return false if the number of frames is equal to the limit.
+                if (dataFrameCount + 1 >= framesLimit)
+                    return false;
+
+                if (frames.size() < framesLimit) {
+                    // Insert a new frame
+                    ByteBuffer frame = ctx.allocateFrame();
+                    frame.position(0);
+                    frame.limit(frame.capacity());
+                    frames.add(frame);
+                    appender.reset(frame, true);
+                    dataFrameCount = frames.size() - 1;
+                } else {
+                    // Reuse an old frame
+                    dataFrameCount++;
+                    ByteBuffer frame = frames.get(dataFrameCount);
+                    frame.position(0);
+                    frame.limit(frame.capacity());
+                    appender.reset(frame, true);
+                }
+                return true;
+            }
+
+            private void sort(int[] tPointers, int offset, int length) {
+                int m = offset + (length >> 1);
+                int mTable = tPointers[m * 3];
+                int mRow = tPointers[m * 3 + 1];
+                int mNormKey = tPointers[m * 3 + 2];
+
+                table.getTuplePointer(mTable, mRow, storedTuplePointer);
+                int mFrame = storedTuplePointer.frameIndex;
+                int mTuple = storedTuplePointer.tupleIndex;
+                storedKeysAccessor1.reset(frames.get(mFrame));
+
+                int a = offset;
+                int b = a;
+                int c = offset + length - 1;
+                int d = c;
+                while (true) {
+                    while (b <= c) {
+                        int bTable = tPointers[b * 3];
+                        int bRow = tPointers[b * 3 + 1];
+                        int bNormKey = tPointers[b * 3 + 2];
+                        int cmp = 0;
+                        if (bNormKey != mNormKey) {
+                            cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
+                                    : 1;
+                        } else {
+                            table.getTuplePointer(bTable, bRow,
+                                    storedTuplePointer);
+                            int bFrame = storedTuplePointer.frameIndex;
+                            int bTuple = storedTuplePointer.tupleIndex;
+                            storedKeysAccessor2.reset(frames.get(bFrame));
+                            cmp = ftpcTuple.compare(storedKeysAccessor2,
+                                    bTuple, storedKeysAccessor1, mTuple);
+                        }
+                        if (cmp > 0) {
+                            break;
+                        }
+                        if (cmp == 0) {
+                            swap(tPointers, a++, b);
+                        }
+                        ++b;
+                    }
+                    while (c >= b) {
+                        int cTable = tPointers[c * 3];
+                        int cRow = tPointers[c * 3 + 1];
+                        int cNormKey = tPointers[c * 3 + 2];
+                        int cmp = 0;
+                        if (cNormKey != mNormKey) {
+                            cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
+                                    : 1;
+                        } else {
+                            table.getTuplePointer(cTable, cRow,
+                                    storedTuplePointer);
+                            int cFrame = storedTuplePointer.frameIndex;
+                            int cTuple = storedTuplePointer.tupleIndex;
+                            storedKeysAccessor2.reset(frames.get(cFrame));
+                            cmp = ftpcTuple.compare(storedKeysAccessor2,
+                                    cTuple, storedKeysAccessor1, mTuple);
+                        }
+                        if (cmp < 0) {
+                            break;
+                        }
+                        if (cmp == 0) {
+                            swap(tPointers, c, d--);
+                        }
+                        --c;
+                    }
+                    if (b > c)
+                        break;
+                    swap(tPointers, b++, c--);
+                }
+
+                int s;
+                int n = offset + length;
+                s = Math.min(a - offset, b - a);
+                vecswap(tPointers, offset, b - s, s);
+                s = Math.min(d - c, n - d - 1);
+                vecswap(tPointers, b, n - s, s);
+
+                if ((s = b - a) > 1) {
+                    sort(tPointers, offset, s);
+                }
+                if ((s = d - c) > 1) {
+                    sort(tPointers, n - s, s);
+                }
+            }
+
+            private void swap(int x[], int a, int b) {
+                for (int i = 0; i < 3; ++i) {
+                    int t = x[a * 3 + i];
+                    x[a * 3 + i] = x[b * 3 + i];
+                    x[b * 3 + i] = t;
+                }
+            }
+
+            private void vecswap(int x[], int a, int b, int n) {
+                for (int i = 0; i < n; i++, a++, b++) {
+                    swap(x, a, b);
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
new file mode 100644
index 0000000..169ada7
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateState.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface IAggregateState extends Serializable {
+
+	/**
+	 * Return the length of the state if in the frame
+	 * @return
+	 */
+	public int getLength();
+	
+	/**
+	 * Return the state as a java object
+	 * @return
+	 */
+	public Object getState();
+	
+	/**
+	 * Set the state.
+	 * @param obj
+	 */
+	public void setState(Object obj);
+	
+	/**
+	 * Reset the state. 
+	 */
+	public void reset();
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
new file mode 100644
index 0000000..06d5f68
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ *
+ */
+public interface IFieldAggregateDescriptor {
+
+    /**
+     * Create an aggregate state
+     * 
+     * @return
+     */
+    public IAggregateState createState();
+
+    /**
+     * Initialize the state based on the input tuple.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param fieldOutput
+     *            The data output for the frame containing the state. This may
+     *            be null, if the state is maintained as a java object
+     * @param state
+     *            The state to be initialized.
+     * @throws HyracksDataException
+     */
+    public void init(IFrameTupleAccessor accessor, int tIndex,
+            DataOutput fieldOutput, IAggregateState state)
+            throws HyracksDataException;
+
+    /**
+     * Reset the aggregator. The corresponding aggregate state should be reset
+     * too. Note that here the frame is not an input argument, since it can be
+     * reset outside of the aggregator (simply reset the starting index of the
+     * buffer).
+     * 
+     * @param state
+     */
+    public void reset(IAggregateState state);
+
+    /**
+     * Aggregate the value. Aggregate state should be updated correspondingly.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param data
+     *            The buffer containing the state, if frame-based-state is used.
+     *            This means that it can be null if java-object-based-state is
+     *            used.
+     * @param offset
+     * @param state
+     *            The aggregate state.
+     * @throws HyracksDataException
+     */
+    public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+            byte[] data, int offset, IAggregateState state)
+            throws HyracksDataException;
+
+    /**
+     * Output the partial aggregation result.
+     * 
+     * @param fieldOutput
+     *            The data output for the output frame
+     * @param data
+     *            The buffer containing the aggregation state
+     * @param offset
+     * @param state
+     *            The aggregation state.
+     * @throws HyracksDataException
+     */
+    public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+            int offset, IAggregateState state) throws HyracksDataException;
+
+    /**
+     * Output the final aggregation result.
+     * 
+     * @param fieldOutput
+     *            The data output for the output frame
+     * @param data
+     *            The buffer containing the aggregation state
+     * @param offset
+     * @param state
+     *            The aggregation state.
+     * @throws HyracksDataException
+     */
+    public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+            int offset, IAggregateState state) throws HyracksDataException;
+
+    public void close();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
new file mode 100644
index 0000000..0e2d530
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ *
+ */
+public interface IFieldAggregateDescriptorFactory extends Serializable {
+
+	public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+			RecordDescriptor inRecordDescriptor,
+			RecordDescriptor outRecordDescriptor) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
new file mode 100644
index 0000000..cc8cbd2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public interface ISpillableTable {
+
+    public void close();
+
+    public void reset();
+
+    public int getFrameCount();
+
+    public List<ByteBuffer> getFrames();
+
+    public void sortFrames();
+
+    public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+    public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
new file mode 100644
index 0000000..6420766
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISpillableTableFactory extends Serializable {
+    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
+            IFieldAggregateDescriptorFactory[] aggregatorFactories, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
new file mode 100644
index 0000000..886dd06
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class AvgAggregatorFactory implements IFieldAggregateDescriptorFactory {
+    
+    private static final long serialVersionUID = 1L;
+    
+    private final int aggField;
+    
+    public AvgAggregatorFactory(int aggField){
+        this.aggField = aggField;
+    }
+    
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+     */
+    @Override
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+        
+        return new IFieldAggregateDescriptor() {
+            
+            @Override
+            public void reset(IAggregateState state) {
+                state.reset();
+            }
+            
+            @Override
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+                    int offset, IAggregateState state) throws HyracksDataException {
+                int sum, count;
+                if (data != null) {
+                    sum = IntegerSerializerDeserializer.getInt(data, offset);
+                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+                } else {
+                    Integer[] fields = (Integer[])state.getState();
+                    sum = fields[0];
+                    count = fields[1];
+                }
+                try {
+                    fieldOutput.writeInt(sum);
+                    fieldOutput.writeInt(count);
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+            
+            @Override
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+                    int offset, IAggregateState state) throws HyracksDataException {
+                int sum, count;
+                if (data != null) {
+                    sum = IntegerSerializerDeserializer.getInt(data, offset);
+                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+                } else {
+                    Integer[] fields = (Integer[])state.getState();
+                    sum = fields[0];
+                    count = fields[1];
+                }
+                try {
+                    fieldOutput.writeFloat((float)sum/count);
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+            
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex,
+                    DataOutput fieldOutput, IAggregateState state)
+                    throws HyracksDataException {
+                int sum = 0;
+                int count = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart);
+                if (fieldOutput != null) {
+                    try {
+                        fieldOutput.writeInt(sum);
+                        fieldOutput.writeInt(count);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(
+                                "I/O exception when initializing the aggregator.");
+                    }
+                } else {
+                    state.setState(new Object[]{sum, count});
+                }
+            }
+            
+            @Override
+            public IAggregateState createState() {
+                return new IAggregateState() {
+                    
+                    private static final long serialVersionUID = 1L;
+                    
+                    Object state = null;
+                    
+                    @Override
+                    public void setState(Object obj) {
+                        state = null;
+                        state = obj;
+                    }
+                    
+                    @Override
+                    public void reset() {
+                        state = null;
+                    }
+                    
+                    @Override
+                    public Object getState() {
+                        return state;
+                    }
+                    
+                    @Override
+                    public int getLength() {
+                        return 8;
+                    }
+                };
+            }
+            
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+                
+            }
+            
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+                    byte[] data, int offset, IAggregateState state)
+                    throws HyracksDataException {
+                int sum = 0, count = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart);
+                count += 1;
+                if (data != null) {
+                    ByteBuffer buf = ByteBuffer.wrap(data);
+                    sum += buf.getInt(offset);
+                    count += buf.getInt(offset + 4);
+                    buf.putInt(offset, sum);
+                    buf.putInt(offset + 4, count);
+                } else {
+                    Integer[] fields = (Integer[])state.getState();
+                    sum += fields[0];
+                    count += fields[1];
+                    state.setState(new Object[]{sum, count});
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
new file mode 100644
index 0000000..6760544
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class AvgMergeAggregatorFactory implements
+        IFieldAggregateDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int aggField;
+    
+    public AvgMergeAggregatorFactory(int aggField){
+        this.aggField = aggField;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+     * IFieldAggregateDescriptorFactory
+     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+     */
+    @Override
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+        return new IFieldAggregateDescriptor() {
+            
+            @Override
+            public void reset(IAggregateState state) {
+                state.reset();
+            }
+            
+            @Override
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+                    int offset, IAggregateState state) throws HyracksDataException {
+                int sum, count;
+                if (data != null) {
+                    sum = IntegerSerializerDeserializer.getInt(data, offset);
+                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+                } else {
+                    Integer[] fields = (Integer[])state.getState();
+                    sum = fields[0];
+                    count = fields[1];
+                }
+                try {
+                    fieldOutput.writeInt(sum);
+                    fieldOutput.writeInt(count);
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+            
+            @Override
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+                    int offset, IAggregateState state) throws HyracksDataException {
+                int sum, count;
+                if (data != null) {
+                    sum = IntegerSerializerDeserializer.getInt(data, offset);
+                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+                } else {
+                    Integer[] fields = (Integer[])state.getState();
+                    sum = fields[0];
+                    count = fields[1];
+                }
+                try {
+                    fieldOutput.writeFloat((float)sum/count);
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+            
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex,
+                    DataOutput fieldOutput, IAggregateState state)
+                    throws HyracksDataException {
+                int sum = 0;
+                int count = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart);
+                count += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart + 4);
+                if (fieldOutput != null) {
+                    try {
+                        fieldOutput.writeInt(sum);
+                        fieldOutput.writeInt(count);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(
+                                "I/O exception when initializing the aggregator.");
+                    }
+                } else {
+                    state.setState(new Object[]{sum, count});
+                }
+            }
+            
+            @Override
+            public IAggregateState createState() {
+                return new IAggregateState() {
+                    
+                    private static final long serialVersionUID = 1L;
+                    
+                    Object state = null;
+                    
+                    @Override
+                    public void setState(Object obj) {
+                        state = null;
+                        state = obj;
+                    }
+                    
+                    @Override
+                    public void reset() {
+                        state = null;
+                    }
+                    
+                    @Override
+                    public Object getState() {
+                        return state;
+                    }
+                    
+                    @Override
+                    public int getLength() {
+                        return 8;
+                    }
+                };
+            }
+            
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+                
+            }
+            
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+                    byte[] data, int offset, IAggregateState state)
+                    throws HyracksDataException {
+                int sum = 0, count = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart);
+                count += 1;
+                if (data != null) {
+                    ByteBuffer buf = ByteBuffer.wrap(data);
+                    sum += buf.getInt(offset);
+                    count += buf.getInt(offset + 4);
+                    buf.putInt(offset, sum);
+                    buf.putInt(offset + 4, count);
+                } else {
+                    Integer[] fields = (Integer[])state.getState();
+                    sum += fields[0];
+                    count += fields[1];
+                    state.setState(new Object[]{sum, count});
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
new file mode 100644
index 0000000..3124ae6
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class IntSumAggregatorFactory implements
+        IFieldAggregateDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int aggField;
+
+    public IntSumAggregatorFactory(int aggField) {
+        this.aggField = aggField;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+     * IFieldAggregateDescriptorFactory
+     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+     */
+    @Override
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+
+        return new IFieldAggregateDescriptor() {
+
+            @Override
+            public void reset(IAggregateState state) {
+                state.reset();
+            }
+
+            @Override
+            public void outputPartialResult(DataOutput fieldOutput,
+                    byte[] data, int offset, IAggregateState state)
+                    throws HyracksDataException {
+                int sum;
+                if (data != null) {
+                    sum = IntegerSerializerDeserializer.getInt(data, offset);
+                } else {
+                    sum = (Integer) (state.getState());
+                }
+                try {
+                    fieldOutput.writeInt(sum);
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+            @Override
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+                    int offset, IAggregateState state)
+                    throws HyracksDataException {
+                int sum;
+                if (data != null) {
+                    sum = IntegerSerializerDeserializer.getInt(data, offset);
+                } else {
+                    sum = (Integer) (state.getState());
+                }
+                try {
+                    fieldOutput.writeInt(sum);
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex,
+                    DataOutput fieldOutput, IAggregateState state)
+                    throws HyracksDataException {
+                int sum = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart);
+                if (fieldOutput != null) {
+                    try {
+                        fieldOutput.writeInt(sum);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(
+                                "I/O exception when initializing the aggregator.");
+                    }
+                } else {
+                    state.setState(new Integer(sum));
+                }
+            }
+
+            @Override
+            public IAggregateState createState() {
+                return new IAggregateState() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    Integer sum = null;
+
+                    public int getLength() {
+                        return 4;
+                    }
+
+                    public Object getState() {
+                        return sum;
+                    }
+
+                    public void setState(Object obj) {
+                        sum = null;
+                        sum = (Integer) obj;
+                    }
+
+                    public void reset() {
+                        sum = null;
+                    }
+
+                };
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+                    byte[] data, int offset, IAggregateState state)
+                    throws HyracksDataException {
+                int sum = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart);
+                if (data != null) {
+                    ByteBuffer buf = ByteBuffer.wrap(data);
+                    sum += buf.getInt(offset);
+                    buf.putInt(offset, sum);
+                } else {
+                    sum += (Integer) (state.getState());
+                    state.setState(new Integer(sum));
+                }
+            }
+        };
+    }
+
+}