Pre-merge cleanup: removed old aggregator interfaces, and updated codes to use the new interface.  

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@963 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
deleted file mode 100644
index 2327965..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,702 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
-
-/**
- *
- */
-public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
-    private static final int AGGREGATE_ACTIVITY_ID = 0;
-
-    private static final int MERGE_ACTIVITY_ID = 1;
-
-    private static final long serialVersionUID = 1L;
-    private final int[] keyFields;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-    private final IAggregatorDescriptorFactory mergerFactory;
-
-    private final int framesLimit;
-    private final ISpillableTableFactory spillableTableFactory;
-    private final boolean isOutputSorted;
-
-    public ExternalGroupOperatorDescriptor(JobSpecification spec,
-            int[] keyFields, int framesLimit,
-            IBinaryComparatorFactory[] comparatorFactories,
-            INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory,
-            IAggregatorDescriptorFactory mergerFactory,
-            RecordDescriptor recordDescriptor,
-            ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
-        super(spec, 1, 1);
-        this.framesLimit = framesLimit;
-        if (framesLimit <= 1) {
-            /**
-             * Minimum of 2 frames: 1 for input records, and 1 for output
-             * aggregation results.
-             */
-            throw new IllegalStateException(
-                    "frame limit should at least be 2, but it is "
-                            + framesLimit + "!");
-        }
-        this.aggregatorFactory = aggregatorFactory;
-        this.mergerFactory = mergerFactory;
-        this.keyFields = keyFields;
-        this.comparatorFactories = comparatorFactories;
-        this.firstNormalizerFactory = firstNormalizerFactory;
-        this.spillableTableFactory = spillableTableFactory;
-        this.isOutputSorted = isOutputSorted;
-
-        /**
-         * Set the record descriptor. Note that since this operator is a unary
-         * operator, only the first record descriptor is used here.
-         */
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
-     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
-     */
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
-                getOperatorId(), AGGREGATE_ACTIVITY_ID));
-        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
-                MERGE_ACTIVITY_ID));
-
-        builder.addActivity(aggregateAct);
-        builder.addSourceEdge(0, aggregateAct, 0);
-
-        builder.addActivity(mergeAct);
-        builder.addTargetEdge(0, mergeAct, 0);
-
-        builder.addBlockingEdge(aggregateAct, mergeAct);
-    }
-
-    public static class AggregateActivityState extends AbstractTaskState {
-        private LinkedList<RunFileReader> runs;
-
-        private ISpillableTable gTable;
-
-        public AggregateActivityState() {
-        }
-
-        private AggregateActivityState(JobId jobId, TaskId tId) {
-            super(jobId, tId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private class AggregateActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public AggregateActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions)
-                throws HyracksDataException {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(
-                    ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(
-                            getOperatorId(), 0));
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private AggregateActivityState state;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    state = new AggregateActivityState(ctx.getJobletContext()
-                            .getJobId(), new TaskId(getActivityId(), partition));
-                    state.runs = new LinkedList<RunFileReader>();
-                    state.gTable = spillableTableFactory.buildSpillableTable(
-                            ctx, keyFields, comparatorFactories,
-                            firstNormalizerFactory, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(
-                                    getOperatorId(), 0), recordDescriptors[0],
-                            ExternalGroupOperatorDescriptor.this.framesLimit);
-                    state.gTable.reset();
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer)
-                        throws HyracksDataException {
-                    accessor.reset(buffer);
-                    int tupleCount = accessor.getTupleCount();
-                    for (int i = 0; i < tupleCount; i++) {
-                        /**
-                         * If the group table is too large, flush the table into
-                         * a run file.
-                         */
-                        if (!state.gTable.insert(accessor, i)) {
-                            flushFramesToRun();
-                            if (!state.gTable.insert(accessor, i))
-                                throw new HyracksDataException(
-                                        "Failed to insert a new buffer into the aggregate operator!");
-                        }
-                    }
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    if (state.gTable.getFrameCount() >= 0) {
-                        if (state.runs.size() > 0) {
-                            /**
-                             * flush the memory into the run file.
-                             */
-                            flushFramesToRun();
-                            state.gTable.close();
-                            state.gTable = null;
-                        }
-                    }
-                    ctx.setTaskState(state);
-                }
-
-                private void flushFramesToRun() throws HyracksDataException {
-                    FileReference runFile;
-                    try {
-                        runFile = ctx.getJobletContext()
-                                .createManagedWorkspaceFile(
-                                        ExternalGroupOperatorDescriptor.class
-                                                .getSimpleName());
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    RunFileWriter writer = new RunFileWriter(runFile,
-                            ctx.getIOManager());
-                    writer.open();
-                    try {
-                        state.gTable.sortFrames();
-                        state.gTable.flushFrames(writer, true);
-                    } catch (Exception ex) {
-                        throw new HyracksDataException(ex);
-                    } finally {
-                        writer.close();
-                    }
-                    state.gTable.reset();
-                    state.runs.add(((RunFileWriter) writer).createReader());
-                }
-            };
-            return op;
-        }
-    }
-
-    private class MergeActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public MergeActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions)
-                throws HyracksDataException {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i]
-                        .createBinaryComparator();
-            }
-
-            int[] keyFieldsInPartialResults = new int[keyFields.length];
-            for(int i = 0; i < keyFieldsInPartialResults.length; i++){
-                keyFieldsInPartialResults[i] = i;
-            }
-            
-            final IAggregatorDescriptor aggregator = mergerFactory
-                    .createAggregator(ctx, recordDescriptors[0],
-                            recordDescriptors[0], keyFields, keyFieldsInPartialResults);
-            final AggregateState aggregateState = aggregator
-                    .createAggregateStates();
-
-            final int[] storedKeys = new int[keyFields.length];
-            /**
-             * Get the list of the fields in the stored records.
-             */
-            for (int i = 0; i < keyFields.length; ++i) {
-                storedKeys[i] = i;
-            }
-
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                /**
-                 * Input frames, one for each run file.
-                 */
-                private List<ByteBuffer> inFrames;
-
-                /**
-                 * Output frame.
-                 */
-                private ByteBuffer outFrame, writerFrame;
-
-                private LinkedList<RunFileReader> runs;
-
-                private AggregateActivityState aggState;
-
-                /**
-                 * how many frames to be read ahead once
-                 */
-                private int runFrameLimit = 1;
-
-                private int[] currentFrameIndexInRun;
-                private int[] currentRunFrames;
-                private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
-                        ctx.getFrameSize());
-                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
-                        ctx.getFrameSize(), recordDescriptors[0]);
-                private FrameTupleAppender writerFrameAppender;
-
-                public void initialize() throws HyracksDataException {
-                    aggState = (AggregateActivityState) ctx
-                            .getTaskState(new TaskId(new ActivityId(
-                                    getOperatorId(), AGGREGATE_ACTIVITY_ID),
-                                    partition));
-                    runs = aggState.runs;
-                    writer.open();
-                    try {
-                        if (runs.size() <= 0) {
-                            ISpillableTable gTable = aggState.gTable;
-                            if (gTable != null) {
-                                if (isOutputSorted)
-                                    gTable.sortFrames();
-                                gTable.flushFrames(writer, false);
-                            }
-                        } else {
-                            runs = new LinkedList<RunFileReader>(runs);
-                            inFrames = new ArrayList<ByteBuffer>();
-                            outFrame = ctx.allocateFrame();
-                            outFrameAppender.reset(outFrame, true);
-                            outFrameAccessor.reset(outFrame);
-                            while (runs.size() > 0) {
-                                try {
-                                    doPass(runs);
-                                } catch (Exception e) {
-                                    throw new HyracksDataException(e);
-                                }
-                            }
-                            inFrames.clear();
-                        }
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        aggregateState.close();
-                        writer.close();
-                    }
-                }
-
-                private void doPass(LinkedList<RunFileReader> runs)
-                        throws HyracksDataException {
-                    FileReference newRun = null;
-                    IFrameWriter writer = this.writer;
-                    boolean finalPass = false;
-
-                    while (inFrames.size() + 2 < framesLimit) {
-                        inFrames.add(ctx.allocateFrame());
-                    }
-                    int runNumber;
-                    if (runs.size() + 2 <= framesLimit) {
-                        finalPass = true;
-                        runFrameLimit = (framesLimit - 2) / runs.size();
-                        runNumber = runs.size();
-                    } else {
-                        runNumber = framesLimit - 2;
-                        newRun = ctx.getJobletContext()
-                                .createManagedWorkspaceFile(
-                                        ExternalGroupOperatorDescriptor.class
-                                                .getSimpleName());
-                        writer = new RunFileWriter(newRun, ctx.getIOManager());
-                        writer.open();
-                    }
-                    try {
-                        currentFrameIndexInRun = new int[runNumber];
-                        currentRunFrames = new int[runNumber];
-                        /**
-                         * Create file readers for each input run file, only for
-                         * the ones fit into the inFrames
-                         */
-                        RunFileReader[] runFileReaders = new RunFileReader[runNumber];
-                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
-                                .size()];
-                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
-                                ctx.getFrameSize(), recordDescriptors[0],
-                                runNumber, comparator);
-                        /**
-                         * current tuple index in each run
-                         */
-                        int[] tupleIndices = new int[runNumber];
-
-                        for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
-                            tupleIndices[runIndex] = 0;
-                            // Load the run file
-                            runFileReaders[runIndex] = runs.get(runIndex);
-                            runFileReaders[runIndex].open();
-
-                            currentRunFrames[runIndex] = 0;
-                            currentFrameIndexInRun[runIndex] = runIndex
-                                    * runFrameLimit;
-                            for (int j = 0; j < runFrameLimit; j++) {
-                                int frameIndex = currentFrameIndexInRun[runIndex]
-                                        + j;
-                                if (runFileReaders[runIndex].nextFrame(inFrames
-                                        .get(frameIndex))) {
-                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(
-                                            ctx.getFrameSize(),
-                                            recordDescriptors[0]);
-                                    tupleAccessors[frameIndex].reset(inFrames
-                                            .get(frameIndex));
-                                    currentRunFrames[runIndex]++;
-                                    if (j == 0)
-                                        setNextTopTuple(runIndex, tupleIndices,
-                                                runFileReaders, tupleAccessors,
-                                                topTuples);
-                                } else {
-                                    closeRun(runIndex, runFileReaders, tupleAccessors);
-                                    break;
-                                }
-                            }
-                        }
-
-                        /**
-                         * Start merging
-                         */
-                        while (!topTuples.areRunsExhausted()) {
-                            /**
-                             * Get the top record
-                             */
-                            ReferenceEntry top = topTuples.peek();
-                            int tupleIndex = top.getTupleIndex();
-                            int runIndex = topTuples.peek().getRunid();
-                            FrameTupleAccessor fta = top.getAccessor();
-
-                            int currentTupleInOutFrame = outFrameAccessor
-                                    .getTupleCount() - 1;
-                            if (currentTupleInOutFrame < 0
-                                    || compareFrameTuples(fta, tupleIndex,
-                                            outFrameAccessor,
-                                            currentTupleInOutFrame) != 0) {
-                                /**
-                                 * Initialize the first output record Reset the
-                                 * tuple builder
-                                 */
-                                if (!aggregator.initFromPartial(outFrameAppender, fta,
-                                        tupleIndex, aggregateState)) {
-                                    flushOutFrame(writer, finalPass);
-                                    if (!aggregator.initFromPartial(outFrameAppender, fta,
-                                            tupleIndex, aggregateState)) {
-                                        throw new HyracksDataException(
-                                                "Failed to append an aggregation result to the output frame.");
-                                    }
-                                }
-
-                            } else {
-                                /**
-                                 * if new tuple is in the same group of the
-                                 * current aggregator do merge and output to the
-                                 * outFrame
-                                 */
-
-                                aggregator.aggregate(fta, tupleIndex,
-                                        outFrameAccessor,
-                                        currentTupleInOutFrame, aggregateState);
-
-                            }
-                            tupleIndices[runIndex]++;
-                            setNextTopTuple(runIndex, tupleIndices,
-                                    runFileReaders, tupleAccessors, topTuples);
-                        }
-
-                        if (outFrameAppender.getTupleCount() > 0) {
-                            flushOutFrame(writer, finalPass);
-                        }
-
-                        aggregator.close();
-                        
-                        runs.subList(0, runNumber).clear();
-                        /**
-                         * insert the new run file into the beginning of the run
-                         * file list
-                         */
-                        if (!finalPass) {
-                            runs.add(0, ((RunFileWriter) writer).createReader());
-                        }
-                    } finally {
-                        if (!finalPass) {
-                            writer.close();
-                        }
-                    }
-                }
-
-                private void flushOutFrame(IFrameWriter writer, boolean isFinal)
-                        throws HyracksDataException {
-                    if (writerFrame == null) {
-                        writerFrame = ctx.allocateFrame();
-                    }
-                    if (writerFrameAppender == null) {
-                        writerFrameAppender = new FrameTupleAppender(
-                                ctx.getFrameSize());
-                        writerFrameAppender.reset(writerFrame, true);
-                    }
-                    outFrameAccessor.reset(outFrame);
-
-                    for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-                        
-                        if(isFinal){
-                            if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
-                                FrameUtils.flushFrame(writerFrame, writer);
-                                writerFrameAppender.reset(writerFrame, true);
-                                if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
-                                    throw new HyracksDataException(
-                                            "Failed to write final aggregation result to a writer frame!");
-                                }
-                            }
-                        } else {
-                            if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
-                                FrameUtils.flushFrame(writerFrame, writer);
-                                writerFrameAppender.reset(writerFrame, true);
-                                if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
-                                    throw new HyracksDataException(
-                                            "Failed to write final aggregation result to a writer frame!");
-                                }
-                            }
-                        }
-                    }
-                    if (writerFrameAppender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(writerFrame, writer);
-                        writerFrameAppender.reset(writerFrame, true);
-                    }
-                    outFrameAppender.reset(outFrame, true);
-                }
-
-                private void setNextTopTuple(int runIndex, int[] tupleIndices,
-                        RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors,
-                        ReferencedPriorityQueue topTuples)
-                        throws HyracksDataException {
-                    int runStart = runIndex * runFrameLimit;
-                    boolean existNext = false;
-                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
-                            || runCursors[runIndex] == null) {
-                        /**
-                         * run already closed
-                         */
-                        existNext = false;
-                    } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
-                        /**
-                         * not the last frame for this run
-                         */
-                        existNext = true;
-                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
-                                .getTupleCount()) {
-                            tupleIndices[runIndex] = 0;
-                            currentFrameIndexInRun[runIndex]++;
-                        }
-                    } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
-                            .getTupleCount()) {
-                        /**
-                         * the last frame has expired
-                         */
-                        existNext = true;
-                    } else {
-                        /**
-                         * If all tuples in the targeting frame have been
-                         * checked.
-                         */
-                        int frameOffset = runIndex * runFrameLimit;
-                        tupleIndices[runIndex] = 0;
-                        currentFrameIndexInRun[runIndex] = frameOffset;
-                        /**
-                         * read in batch
-                         */
-                        currentRunFrames[runIndex] = 0;
-                        for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
-                            ByteBuffer buffer = tupleAccessors[frameOffset]
-                                    .getBuffer();
-                            if (runCursors[runIndex].nextFrame(buffer)) {
-                                tupleAccessors[frameOffset].reset(buffer);
-                                if (tupleAccessors[frameOffset].getTupleCount() > 0) {
-                                    existNext = true;
-                                } else {
-                                    throw new IllegalStateException(
-                                            "illegal: empty run file");
-                                }
-                                currentRunFrames[runIndex]++;
-                            } else {
-                                break;
-                            }
-                        }
-                    }
-
-                    if (existNext) {
-                        topTuples
-                                .popAndReplace(
-                                        tupleAccessors[currentFrameIndexInRun[runIndex]],
-                                        tupleIndices[runIndex]);
-                    } else {
-                        topTuples.pop();
-                        closeRun(runIndex, runCursors, tupleAccessors);
-                    }
-                }
-
-                /**
-                 * Close the run file, and also the corresponding readers and
-                 * input frame.
-                 * 
-                 * @param index
-                 * @param runCursors
-                 * @param tupleAccessor
-                 * @throws HyracksDataException
-                 */
-                private void closeRun(int index, RunFileReader[] runCursors,
-                        IFrameTupleAccessor[] tupleAccessor)
-                        throws HyracksDataException {
-                    if (runCursors[index] != null) {
-                        runCursors[index].close();
-                        runCursors[index] = null;
-                        tupleAccessor[index] = null;
-                    }
-                }
-
-                private int compareFrameTuples(IFrameTupleAccessor fta1,
-                        int j1, IFrameTupleAccessor fta2, int j2) {
-                    byte[] b1 = fta1.getBuffer().array();
-                    byte[] b2 = fta2.getBuffer().array();
-                    for (int f = 0; f < keyFields.length; ++f) {
-                        int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1)
-                                + fta1.getFieldSlotsLength()
-                                + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldLength(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2)
-                                + fta2.getFieldSlotsLength()
-                                + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2_start = fta2.getFieldStartOffset(j2, fIdx);
-                        int l2_end = fta2.getFieldEndOffset(j2, fIdx);
-                        int l2 = l2_end - l2_start;
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
-                }
-            };
-            return op;
-        }
-
-        private Comparator<ReferenceEntry> createEntryComparator(
-                final IBinaryComparator[] comparators) {
-            return new Comparator<ReferenceEntry>() {
-
-                @Override
-                public int compare(ReferenceEntry o1, ReferenceEntry o2) {
-                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
-                            .getAccessor();
-                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
-                            .getAccessor();
-                    int j1 = o1.getTupleIndex();
-                    int j2 = o2.getTupleIndex();
-                    byte[] b1 = fta1.getBuffer().array();
-                    byte[] b2 = fta2.getBuffer().array();
-                    for (int f = 0; f < keyFields.length; ++f) {
-                        int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1)
-                                + fta1.getFieldSlotsLength()
-                                + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx)
-                                - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2)
-                                + fta2.getFieldSlotsLength()
-                                + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx)
-                                - fta2.getFieldStartOffset(j2, fIdx);
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
-                }
-
-            };
-        }
-
-    }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
deleted file mode 100644
index d5edd25..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-
-class GroupingHashTable {
-    /**
-     * The pointers in the link store 3 int values for each entry in the
-     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
-     * 
-     * @author vinayakb
-     */
-    private static class Link {
-        private static final int INIT_POINTERS_SIZE = 9;
-
-        int[] pointers;
-        int size;
-
-        Link() {
-            pointers = new int[INIT_POINTERS_SIZE];
-            size = 0;
-        }
-
-        void add(int bufferIdx, int tIndex, int accumulatorIdx) {
-            while (size + 3 > pointers.length) {
-                pointers = Arrays.copyOf(pointers, pointers.length * 2);
-            }
-            pointers[size++] = bufferIdx;
-            pointers[size++] = tIndex;
-            pointers[size++] = accumulatorIdx;
-        }
-    }
-
-    private static final int INIT_AGG_STATE_SIZE = 8;
-    private final IHyracksTaskContext ctx;
-    private final FrameTupleAppender appender;
-    private final List<ByteBuffer> buffers;
-    private final Link[] table;
-    /**
-     * Aggregate states: a list of states for all groups maintained in the main
-     * memory.
-     */
-    private AggregateState[] aggregateStates;
-    private int accumulatorSize;
-
-    private int lastBIndex;
-    private final int[] storedKeys;
-    private final IBinaryComparator[] comparators;
-    private final FrameTuplePairComparator ftpc;
-    private final ITuplePartitionComputer tpc;
-    private final IAggregatorDescriptor aggregator;
-
-    private final FrameTupleAccessor storedKeysAccessor;
-
-    GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
-            IBinaryComparatorFactory[] comparatorFactories,
-            ITuplePartitionComputerFactory tpcf,
-            IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int tableSize)
-            throws HyracksDataException {
-        this.ctx = ctx;
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        buffers = new ArrayList<ByteBuffer>();
-        table = new Link[tableSize];
-
-        storedKeys = new int[fields.length];
-        @SuppressWarnings("rawtypes")
-        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-        for (int i = 0; i < fields.length; ++i) {
-            storedKeys[i] = i;
-            storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
-        }
-
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-        tpc = tpcf.createPartitioner();
-
-        int[] keyFieldsInPartialResults = new int[fields.length];
-        for(int i = 0; i < keyFieldsInPartialResults.length; i++){
-            keyFieldsInPartialResults[i] = i;
-        }
-        
-        this.aggregator = aggregatorFactory.createAggregator(ctx,
-                inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
-
-        this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
-        accumulatorSize = 0;
-
-        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
-                storedKeySerDeser);
-        storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                storedKeysRecordDescriptor);
-        lastBIndex = -1;
-        addNewBuffer();
-    }
-
-    private void addNewBuffer() {
-        ByteBuffer buffer = ctx.allocateFrame();
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        buffers.add(buffer);
-        appender.reset(buffer, true);
-        ++lastBIndex;
-    }
-
-    private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
-            throws HyracksDataException {
-        ByteBuffer frame = appender.getBuffer();
-        frame.position(0);
-        frame.limit(frame.capacity());
-        writer.nextFrame(appender.getBuffer());
-        appender.reset(appender.getBuffer(), true);
-    }
-
-    void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
-        int entry = tpc.partition(accessor, tIndex, table.length);
-        Link link = table[entry];
-        if (link == null) {
-            link = table[entry] = new Link();
-        }
-        int saIndex = -1;
-        for (int i = 0; i < link.size; i += 3) {
-            int sbIndex = link.pointers[i];
-            int stIndex = link.pointers[i + 1];
-            storedKeysAccessor.reset(buffers.get(sbIndex));
-            int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
-            if (c == 0) {
-                saIndex = link.pointers[i + 2];
-                break;
-            }
-        }
-        if (saIndex < 0) {
-            // Did not find the key. Insert a new entry.
-            saIndex = accumulatorSize++;
-            // Add keys
-
-            // Add index to the keys in frame
-            int sbIndex = lastBIndex;
-            int stIndex = appender.getTupleCount();
-            
-            // Add aggregation fields
-            AggregateState newState = aggregator.createAggregateStates();
-            
-            if(!aggregator.init(appender, accessor, tIndex, newState)){
-                addNewBuffer();
-                sbIndex = lastBIndex;
-                stIndex = appender.getTupleCount();
-                if(!aggregator.init(appender, accessor, tIndex, newState)){
-                    throw new IllegalStateException();
-                }
-            }
-            
-            if (accumulatorSize >= aggregateStates.length) {
-                aggregateStates = Arrays.copyOf(aggregateStates,
-                        aggregateStates.length * 2);
-            }
-
-            aggregateStates[saIndex] = newState;
-
-            link.add(sbIndex, stIndex, saIndex);
-        } else {
-            aggregator.aggregate(accessor, tIndex, null, 0,
-                    aggregateStates[saIndex]);
-        }
-    }
-
-    void write(IFrameWriter writer) throws HyracksDataException {
-        ByteBuffer buffer = ctx.allocateFrame();
-        appender.reset(buffer, true);
-        for (int i = 0; i < table.length; ++i) {
-            Link link = table[i];
-            if (link != null) {
-                for (int j = 0; j < link.size; j += 3) {
-                    int bIndex = link.pointers[j];
-                    int tIndex = link.pointers[j + 1];
-                    int aIndex = link.pointers[j + 2];
-                    ByteBuffer keyBuffer = buffers.get(bIndex);
-                    storedKeysAccessor.reset(keyBuffer);
-
-                    while (!aggregator
-                            .outputFinalResult(appender, storedKeysAccessor,
-                                    tIndex, aggregateStates[aIndex])) {
-                        flushFrame(appender, writer);
-                    }
-                }
-            }
-        }
-        if (appender.getTupleCount() != 0) {
-            flushFrame(appender, writer);
-        }
-    }
-    
-    void close() throws HyracksDataException {
-        for(AggregateState aState : aggregateStates){
-            aState.close();
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
deleted file mode 100644
index 2596b91..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- *
- */
-public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
-    private static final int HASH_BUILD_ACTIVITY_ID = 0;
-
-    private static final int OUTPUT_ACTIVITY_ID = 1;
-
-    private static final long serialVersionUID = 1L;
-
-    private final int[] keys;
-    private final ITuplePartitionComputerFactory tpcf;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-
-    private final int tableSize;
-
-    public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
-            ITuplePartitionComputerFactory tpcf,
-            IBinaryComparatorFactory[] comparatorFactories,
-            IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor outRecordDescriptor, int tableSize) {
-        super(spec, 1, 1);
-        this.keys = keys;
-        this.tpcf = tpcf;
-        this.comparatorFactories = comparatorFactories;
-        this.aggregatorFactory = aggregatorFactory;
-        recordDescriptors[0] = outRecordDescriptor;
-        this.tableSize = tableSize;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
-     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
-     */
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
-                HASH_BUILD_ACTIVITY_ID));
-        builder.addActivity(ha);
-
-        OutputActivity oa = new OutputActivity(new ActivityId(odId,
-                OUTPUT_ACTIVITY_ID));
-        builder.addActivity(oa);
-
-        builder.addSourceEdge(0, ha, 0);
-        builder.addTargetEdge(0, oa, 0);
-        builder.addBlockingEdge(ha, oa);
-    }
-
-    public static class HashBuildActivityState extends AbstractTaskState {
-        private GroupingHashTable table;
-
-        public HashBuildActivityState() {
-        }
-
-        private HashBuildActivityState(JobId jobId, TaskId tId) {
-            super(jobId, tId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-    }
-
-    private class HashBuildActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public HashBuildActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions) {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(
-                    ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(
-                            getOperatorId(), 0));
-            return new AbstractUnaryInputSinkOperatorNodePushable() {
-                private HashBuildActivityState state;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    state = new HashBuildActivityState(ctx.getJobletContext()
-                            .getJobId(), new TaskId(getActivityId(), partition));
-                    state.table = new GroupingHashTable(ctx, keys,
-                            comparatorFactories, tpcf, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(
-                                    getOperatorId(), 0), recordDescriptors[0],
-                            tableSize);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer)
-                        throws HyracksDataException {
-                    accessor.reset(buffer);
-                    int tupleCount = accessor.getTupleCount();
-                    for (int i = 0; i < tupleCount; ++i) {
-                        try {
-                            state.table.insert(accessor, i);
-                        } catch (Exception e) {
-                            System.out.println(e.toString());
-                            throw new HyracksDataException(e);
-                        }
-                    }
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    ctx.setTaskState(state);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    throw new HyracksDataException(
-                            "HashGroupOperator is failed.");
-                }
-            };
-        }
-    }
-
-    private class OutputActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public OutputActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions) {
-            return new AbstractUnaryOutputSourceOperatorNodePushable() {
-                @Override
-                public void initialize() throws HyracksDataException {
-                    HashBuildActivityState buildState = (HashBuildActivityState) ctx
-                            .getTaskState(new TaskId(new ActivityId(
-                                    getOperatorId(), HASH_BUILD_ACTIVITY_ID),
-                                    partition));
-                    GroupingHashTable table = buildState.table;
-                    writer.open();
-                    try {
-                        table.write(writer);
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        writer.close();
-                    }
-                }
-            };
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
deleted file mode 100644
index cc8cbd2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTable.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public interface ISpillableTable {
-
-    public void close();
-
-    public void reset();
-
-    public int getFrameCount();
-
-    public List<ByteBuffer> getFrames();
-
-    public void sortFrames();
-
-    public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
-    public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
deleted file mode 100644
index bae041d..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface ISpillableTableFactory extends Serializable {
-    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
-            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
-            IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
deleted file mode 100644
index 1c1c7a2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-public class PreclusteredGroupOperatorDescriptor extends
-        AbstractSingleActivityOperatorDescriptor {
-    private final int[] groupFields;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IAggregatorDescriptorFactory aggregatorFactory;
-
-    private static final long serialVersionUID = 1L;
-
-    public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
-            int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
-            IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor recordDescriptor) {
-        super(spec, 1, 1);
-        this.groupFields = groupFields;
-        this.comparatorFactories = comparatorFactories;
-        this.aggregatorFactory = aggregatorFactory;
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(
-            final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition,
-            int nPartitions) throws HyracksDataException {
-        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        final RecordDescriptor inRecordDesc = recordDescProvider
-                .getInputRecordDescriptor(getOperatorId(), 0);
-        final IAggregatorDescriptor aggregator = aggregatorFactory
-                .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
-                        groupFields, groupFields);
-        final ByteBuffer copyFrame = ctx.allocateFrame();
-        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
-                ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor.reset(copyFrame);
-        ByteBuffer outFrame = ctx.allocateFrame();
-        final FrameTupleAppender appender = new FrameTupleAppender(
-                ctx.getFrameSize());
-        appender.reset(outFrame, true);
-        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-            private PreclusteredGroupWriter pgw;
-
-            @Override
-            public void open() throws HyracksDataException {
-                pgw = new PreclusteredGroupWriter(ctx, groupFields,
-                        comparators, aggregator, inRecordDesc, writer);
-                pgw.open();
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer)
-                    throws HyracksDataException {
-                pgw.nextFrame(buffer);
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                pgw.fail();
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                pgw.close();
-            }
-        };
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
deleted file mode 100644
index db6e4ab..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class PreclusteredGroupWriter implements IFrameWriter {
-    private final int[] groupFields;
-    private final IBinaryComparator[] comparators;
-    private final IAggregatorDescriptor aggregator;
-    private final AggregateState aggregateState;
-    private final IFrameWriter writer;
-    private final ByteBuffer copyFrame;
-    private final FrameTupleAccessor inFrameAccessor;
-    private final FrameTupleAccessor copyFrameAccessor;
-    private final ByteBuffer outFrame;
-    private final FrameTupleAppender appender;
-    private boolean first;
-
-    public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
-            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
-        this.groupFields = groupFields;
-        this.comparators = comparators;
-        this.aggregator = aggregator;
-        this.aggregateState = aggregator.createAggregateStates();
-        this.writer = writer;
-        copyFrame = ctx.allocateFrame();
-        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor.reset(copyFrame);
-        outFrame = ctx.allocateFrame();
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-        appender.reset(outFrame, true);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        writer.open();
-        first = true;
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        inFrameAccessor.reset(buffer);
-        int nTuples = inFrameAccessor.getTupleCount();
-        for (int i = 0; i < nTuples; ++i) {
-            if (first) {
-                aggregator.init(null, inFrameAccessor, i, aggregateState);
-                first = false;
-            } else {
-                if (i == 0) {
-                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
-                } else {
-                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
-                }
-                
-            }
-        }
-        FrameUtils.copy(buffer, copyFrame);
-    }
-
-    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
-            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
-        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
-            writeOutput(prevTupleAccessor, prevTupleIndex);
-            aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
-        } else {
-            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
-        }
-    }
-
-    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
-            throws HyracksDataException {
-        if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
-            FrameUtils.flushFrame(appender.getBuffer(), writer);
-            appender.reset(appender.getBuffer(), true);
-            if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
-        for (int i = 0; i < comparators.length; ++i) {
-            int fIdx = groupFields[i];
-            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
-            int l1 = a1.getFieldLength(t1Idx, fIdx);
-            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
-            int l2 = a2.getFieldLength(t2Idx, fIdx);
-            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (!first) {
-            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(appender.getBuffer(), writer);
-            }
-        }
-        aggregateState.close();
-        writer.close();
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
deleted file mode 100644
index 45d49e2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class AvgAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-    private static final long serialVersionUID = 1L;
-    private final int avgField;
-    private int outField = -1;
-
-    public AvgAggregatorDescriptorFactory(int avgField) {
-        this.avgField = avgField;
-    }
-
-    public AvgAggregatorDescriptorFactory(int avgField, int outField) {
-        this.avgField = avgField;
-        this.outField = outField;
-    }
-
-    @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException {
-
-        if (this.outField < 0)
-            this.outField = keyFields.length;
-
-        return new IAggregatorDescriptor() {
-
-            @Override
-            public void reset() {
-
-            }
-
-            @Override
-            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-                int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
-                int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
-
-                try {
-                    tupleBuilder.getDataOutput().writeInt(sum / count);
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
-            }
-
-            @Override
-            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-                int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
-                int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
-                try {
-                    tupleBuilder.getDataOutput().writeInt(sum);
-                    tupleBuilder.getDataOutput().writeInt(count);
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                // Init aggregation value
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, avgField);
-                int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
-                int count = 1;
-
-                try {
-                    tupleBuilder.getDataOutput().writeInt(sum);
-                    tupleBuilder.getDataOutput().writeInt(count);
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
-            }
-
-            @Override
-            public void close() {
-
-            }
-
-            @Override
-            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-                int sum1 = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
-                int count1 = 1;
-
-                int sum2 = IntegerSerializerDeserializer.getInt(data, offset);
-                int count2 = IntegerSerializerDeserializer.getInt(data, offset + 4);
-
-                ByteBuffer buf = ByteBuffer.wrap(data, offset, 8);
-                buf.putInt(sum1 + sum2);
-                buf.putInt(count1 + count2);
-
-                return 8;
-            }
-        };
-    }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
deleted file mode 100644
index 1cc3340..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-
-public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-    private static final long serialVersionUID = 1L;
-    private static final int INIT_ACCUMULATORS_SIZE = 8;
-    private final int concatField;
-    private int outField = -1;
-
-    /**
-     * Initialize the aggregator, with the field to be concatenated.
-     * 
-     * @param concatField
-     */
-    public ConcatAggregatorDescriptorFactory(int concatField) {
-        this.concatField = concatField;
-    }
-
-    /**
-     * Initialize the aggregator, with the field index to be concatenated, and
-     * also the field where the aggregation result will be outputted.
-     * 
-     * @param concatField
-     * @param outField
-     */
-    public ConcatAggregatorDescriptorFactory(int concatField, int outField) {
-        this.concatField = concatField;
-        this.outField = outField;
-    }
-
-    /**
-     * Create a concatenation aggregator. A byte buffer will be allocated inside of the
-     * aggregator to contain the partial aggregation results. A reference will be written
-     * onto the output frame for indexing the aggregation result from the buffer.
-     */
-    @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
-
-        if (this.outField < 0)
-            this.outField = keyFields.length;
-
-        return new IAggregatorDescriptor() {
-
-            byte[][] buf = new byte[INIT_ACCUMULATORS_SIZE][];
-
-            int currentAggregatorIndex = -1;
-            int aggregatorCount = 0;
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                // Initialize the aggregation value
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
-                int fieldLength = accessor.getFieldLength(tIndex, concatField);
-                int appendOffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
-                // Get the initial value
-                currentAggregatorIndex++;
-                if (currentAggregatorIndex >= buf.length) {
-                    byte[][] newBuf = new byte[buf.length * 2][];
-                    for (int i = 0; i < buf.length; i++) {
-                        newBuf[i] = buf[i];
-                    }
-                    this.buf = newBuf;
-                }
-                buf[currentAggregatorIndex] = new byte[fieldLength];
-                System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0,
-                        fieldLength);
-                // Update the aggregator index
-                aggregatorCount++;
-
-                try {
-                    tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
-            }
-
-            @Override
-            public void reset() {
-                currentAggregatorIndex = -1;
-                aggregatorCount = 0;
-            }
-
-            @Override
-            public void close() {
-                currentAggregatorIndex = -1;
-                aggregatorCount = 0;
-                for (int i = 0; i < buf.length; i++) {
-                    buf[i] = null;
-                }
-            }
-
-            @Override
-            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
-                    throws HyracksDataException {
-                int refIndex = IntegerSerializerDeserializer.getInt(data, offset);
-                // FIXME Should be done in binary way
-                StringBuilder sbder = new StringBuilder();
-                sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
-                        new ByteArrayInputStream(buf[refIndex]))));
-                // Get the new data
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
-                int fieldLength = accessor.getFieldLength(tIndex, concatField);
-                sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
-                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
-                                + accessor.getFieldSlotsLength() + fieldStart, fieldLength))));
-
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
-                buf[refIndex] = baos.toByteArray();
-                return 4;
-            }
-
-            @Override
-            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-                int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
-                        + accessor.getFieldSlotsLength() + fieldStart);
-
-                try {
-                    if (refIndex >= 0)
-                        tupleBuilder.getDataOutput().write(buf[refIndex]);
-                    else {
-                        int fieldLength = accessor.getFieldLength(tIndex, outField);
-                        tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
-                                tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4, fieldLength - 4);
-                    }
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
-            }
-
-            @Override
-            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldOffset = accessor.getFieldStartOffset(tIndex, outField);
-                int fieldLength = accessor.getFieldLength(tIndex, outField);
-                int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
-                        + accessor.getFieldSlotsLength() + fieldOffset);
-
-                try {
-                    tupleBuilder.getDataOutput().writeInt(-1);
-                    if (refIndex < 0) {
-                        tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
-                                tupleOffset + accessor.getFieldSlotsLength() + fieldOffset + 4, fieldLength - 4);
-                    } else {
-                        tupleBuilder.getDataOutput().write(buf[refIndex], 0, buf[refIndex].length);
-                    }
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException();
-                }
-            }
-        };
-    }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
deleted file mode 100644
index 74ac53a..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class CountAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-    private static final long serialVersionUID = 1L;
-    private int outField = -1;
-
-    public CountAggregatorDescriptorFactory() {
-    }
-
-    public CountAggregatorDescriptorFactory(int outField) {
-        this.outField = outField;
-    }
-
-    @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
-
-        if (this.outField < 0) {
-            this.outField = keyFields.length;
-        }
-        return new IAggregatorDescriptor() {
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
-            }
-
-            @Override
-            public void close() {
-            }
-
-            @Override
-            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
-                    throws HyracksDataException {
-                ByteBuffer buf = ByteBuffer.wrap(data);
-                int count = buf.getInt(offset);
-                buf.putInt(offset, count + 1);
-                return 4;
-            }
-
-            @Override
-            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
-                try {
-                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
-                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException("Failed to write int sum as a partial result.");
-                }
-            }
-
-            @Override
-            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
-                try {
-                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
-                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException("Failed to write int sum as a partial result.");
-                }
-            }
-
-            @Override
-            public void reset() {
-
-            }
-        };
-    }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
deleted file mode 100644
index f1e54e2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class CountAggregatorFactory implements IFieldValueResultingAggregatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
-        return new IFieldValueResultingAggregator() {
-            private int count;
-
-            @Override
-            public void output(DataOutput resultAcceptor) throws HyracksDataException {
-                try {
-                    resultAcceptor.writeInt(count);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                count = 0;
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                ++count;
-            }
-        };
-    }
-
-    @Override
-    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-        return new ISpillableFieldValueResultingAggregator() {
-            private int count;
-
-            @Override
-            public void output(DataOutput resultAcceptor) throws HyracksDataException {
-                try {
-                    resultAcceptor.writeInt(count);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex)
-                    throws HyracksDataException {
-                count = IntegerSerializerDeserializer.getInt(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                count++;
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                count = 0;
-            }
-
-            @Override
-            public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
-                    throws HyracksDataException {
-                count += IntegerSerializerDeserializer.getInt(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-            }
-        };
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
deleted file mode 100644
index 515cc21..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-
-/**
- * SUM aggregator on float type data.
- * 
- */
-public class FloatSumAggregatorFactory implements
-        IFieldValueResultingAggregatorFactory {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-    private int sumField;
-
-    public FloatSumAggregatorFactory(int field) {
-        this.sumField = field;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
-     * IFieldValueResultingAggregatorFactory
-     * #createFieldValueResultingAggregator()
-     */
-    @Override
-    public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
-        return new IFieldValueResultingAggregator() {
-
-            private float sum;
-
-            @Override
-            public void output(DataOutput resultAcceptor)
-                    throws HyracksDataException {
-                try {
-                    resultAcceptor.writeFloat(sum);
-                } catch (IOException ex) {
-                    throw new HyracksDataException(ex);
-                }
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                sum = 0;
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-                sum += FloatSerializerDeserializer.getFloat(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-            }
-        };
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
-     * IFieldValueResultingAggregatorFactory
-     * #createSpillableFieldValueResultingAggregator()
-     */
-    @Override
-    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-        return new ISpillableFieldValueResultingAggregator() {
-
-            private float sum;
-
-            @Override
-            public void output(DataOutput resultAcceptor)
-                    throws HyracksDataException {
-                try {
-                    resultAcceptor.writeFloat(sum);
-                } catch (IOException ex) {
-                    throw new HyracksDataException(ex);
-                }
-            }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
-                    int tIndex, int fIndex) throws HyracksDataException {
-                sum = FloatSerializerDeserializer.getFloat(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex)
-                                + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                sum = 0;
-            }
-
-            @Override
-            public void accumulatePartialResult(IFrameTupleAccessor accessor,
-                    int tIndex, int fIndex) throws HyracksDataException {
-                sum += FloatSerializerDeserializer.getFloat(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex)
-                                + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-                sum += FloatSerializerDeserializer.getFloat(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-            }
-        };
-    }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
deleted file mode 100644
index dfc31cd..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public interface IAggregatorDescriptor {
-
-    /**
-     * Initialize the aggregator with an input tuple specified by the input
-     * frame and tuple index. This function will write the initialized partial
-     * result into the tuple builder.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param tupleBuilder
-     * @throws HyracksDataException
-     */
-    public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-            throws HyracksDataException;
-
-    /**
-     * Aggregate the input tuple with the partial result specified by the bytes.
-     * The new value then is written back to the bytes field specified.
-     * It is the developer's responsibility to have the new result not exceed
-     * the given bytes.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param data
-     * @param offset
-     * @param length
-     * @return
-     * @throws HyracksDataException
-     */
-    public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
-            throws HyracksDataException;
-
-    /**
-     * Output the partial aggregation result to an array tuple builder.
-     * Necessary additional information for aggregation should be maintained.
-     * For example, for an aggregator calculating AVG, the count and also the
-     * current average should be maintained as the partial results.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param tupleBuilder
-     * @throws HyracksDataException
-     */
-    public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-            throws HyracksDataException;
-
-    /**
-     * Output the final aggregation result to an array tuple builder.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param tupleBuilder
-     * @return
-     * @throws HyracksDataException
-     */
-    public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-            throws HyracksDataException;
-
-    /**
-     * reset the internal states
-     */
-    public void reset();
-
-    /**
-     * Close the aggregator. Necessary clean-up code should be implemented here.
-     */
-    public void close();
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
deleted file mode 100644
index 7f6928b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IAggregatorDescriptorFactory extends Serializable {
-
-    /**
-     * Create an aggregator.
-     * 
-     * @param ctx
-     * @param inRecordDescriptor
-     * @param outRecordDescriptor
-     * @param keyFields
-     * @return
-     * @throws HyracksDataException
-     */
-    IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java
deleted file mode 100644
index 783467b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFieldValueResultingAggregator {
-    /**
-     * Called once per aggregator before calling accumulate for the first time.
-     * 
-     * @param accessor
-     *            - Accessor to the data tuple.
-     * @param tIndex
-     *            - Index of the tuple in the accessor.
-     * @throws HyracksDataException
-     */
-    public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
-    /**
-     * Called once per tuple that belongs to this group.
-     * 
-     * @param accessor
-     *            - Accessor to data tuple.
-     * @param tIndex
-     *            - Index of tuple in the accessor.
-     * @throws HyracksDataException
-     */
-    public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
-    /**
-     * Called finally to emit output.
-     * 
-     * @param resultAcceptor
-     *            - Interface to write the result to.
-     * @throws HyracksDataException
-     */
-    public void output(DataOutput resultAcceptor) throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
deleted file mode 100644
index 1b7da7f..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.Serializable;
-
-public interface IFieldValueResultingAggregatorFactory extends Serializable {
-    public IFieldValueResultingAggregator createFieldValueResultingAggregator();
-
-    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator();
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
deleted file mode 100644
index d52f458..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ISpillableFieldValueResultingAggregator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An extended version of the {@link IFieldValueResultingAggregator} supporting
- * external aggregation.
- */
-public interface ISpillableFieldValueResultingAggregator extends IFieldValueResultingAggregator {
-
-    /**
-     * Called once per aggregator before calling accumulate for the first time.
-     * 
-     * @param accessor
-     *            - Accessor to the data tuple.
-     * @param tIndex
-     *            - Index of the tuple in the accessor.
-     * @throws HyracksDataException
-     */
-    public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int fIndex) throws HyracksDataException;
-
-    /**
-     * Aggregate another partial result.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @param fIndex
-     * @throws HyracksDataException
-     */
-    public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int fIndex)
-            throws HyracksDataException;
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
deleted file mode 100644
index 770d36b..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-public class IntSumAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-    private static final long serialVersionUID = 1L;
-    private final int aggField;
-    private int outField = -1;
-
-    public IntSumAggregatorDescriptorFactory(int aggField) {
-        this.aggField = aggField;
-    }
-
-    public IntSumAggregatorDescriptorFactory(int aggField, int outField) {
-        this.aggField = aggField;
-        this.outField = outField;
-    }
-
-    @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
-
-        if (this.outField < 0) {
-            this.outField = keyFields.length;
-        }
-
-        return new IAggregatorDescriptor() {
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int sum = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
-
-                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, sum);
-            }
-
-            @Override
-            public void close() {
-            }
-
-            @Override
-            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
-                    throws HyracksDataException {
-                int sum = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
-                // Update the value of tuple 2
-                ByteBuffer buf = ByteBuffer.wrap(data);
-                sum += buf.getInt(offset);
-                buf.putInt(offset, sum);
-                return 4;
-            }
-
-            @Override
-            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
-                try {
-                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
-                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException("Failed to write int sum as a partial result.");
-                }
-            }
-
-            @Override
-            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
-
-                try {
-                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
-                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
-                    tupleBuilder.addFieldEndOffset();
-                } catch (IOException e) {
-                    throw new HyracksDataException("Failed to write int sum as a partial result.");
-                }
-            }
-
-            @Override
-            public void reset() {
-
-            }
-        };
-    }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
deleted file mode 100644
index 0db3511..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-
-/**
- * Min/Max aggregator factory
- */
-public class MinMaxAggregatorFactory implements
-        IFieldValueResultingAggregatorFactory {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * indicate the type of the value: true: max false: min
-     */
-    private boolean type;
-
-    /**
-     * The field to be aggregated.
-     */
-    private int field;
-
-    public MinMaxAggregatorFactory(boolean type, int field) {
-        this.type = type;
-        this.field = field;
-    }
-
-    @Override
-    public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
-        return new IFieldValueResultingAggregator() {
-
-            private float minmax;
-
-            @Override
-            public void output(DataOutput resultAcceptor)
-                    throws HyracksDataException {
-                try {
-                    resultAcceptor.writeFloat(minmax);
-                } catch (IOException ex) {
-                    throw new HyracksDataException(ex);
-                }
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                minmax = 0;
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, field);
-                float nval = FloatSerializerDeserializer.getFloat(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                if ((type ? (nval > minmax) : (nval < minmax))) {
-                    minmax = nval;
-                }
-            }
-        };
-    }
-
-    @Override
-    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-        return new ISpillableFieldValueResultingAggregator() {
-
-            private float minmax;
-
-            @Override
-            public void output(DataOutput resultAcceptor)
-                    throws HyracksDataException {
-                try {
-                    resultAcceptor.writeFloat(minmax);
-                } catch (IOException ex) {
-                    throw new HyracksDataException(ex);
-                }
-            }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
-                    int tIndex, int fIndex) throws HyracksDataException {
-                minmax = FloatSerializerDeserializer.getFloat(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex)
-                                + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                minmax = 0;
-            }
-
-            @Override
-            public void accumulatePartialResult(IFrameTupleAccessor accessor,
-                    int tIndex, int fIndex) throws HyracksDataException {
-                minmax = FloatSerializerDeserializer.getFloat(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex)
-                                + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, field);
-                float nval = FloatSerializerDeserializer.getFloat(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                if ((type ? (nval > minmax) : (nval < minmax))) {
-                    minmax = nval;
-                }
-            }
-        };
-    }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
deleted file mode 100644
index 053b9e2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
-    private static final long serialVersionUID = 1L;
-    private final IAggregatorDescriptorFactory[] aggregatorFactories;
-
-    public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
-        this.aggregatorFactories = aggregatorFactories;
-    }
-
-    @Override
-    public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx,
-            final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
-            throws HyracksDataException {
-
-        final IAggregatorDescriptor[] aggregators = new IAggregatorDescriptor[this.aggregatorFactories.length];
-        for (int i = 0; i < aggregators.length; i++) {
-            aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
-                    keyFields);
-        }
-
-        return new IAggregatorDescriptor() {
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].init(accessor, tIndex, tupleBuilder);
-                }
-            }
-
-            @Override
-            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
-                    throws HyracksDataException {
-                int adjust = 0;
-                for (int i = 0; i < aggregators.length; i++) {
-                    adjust += aggregators[i].aggregate(accessor, tIndex, data, offset + adjust, length - adjust);
-                }
-                return adjust;
-            }
-
-            @Override
-            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].outputPartialResult(accessor, tIndex, tupleBuilder);
-                }
-            }
-
-            @Override
-            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].outputResult(accessor, tIndex, tupleBuilder);
-                }
-            }
-
-            @Override
-            public void close() {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].close();
-                }
-            }
-
-            @Override
-            public void reset() {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].reset();
-                }
-            }
-
-        };
-    }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
deleted file mode 100644
index 8237110..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregator;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
-
-public class MultiAggregatorFactory implements IAccumulatingAggregatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    private IFieldValueResultingAggregatorFactory[] aFactories;
-
-    public MultiAggregatorFactory(IFieldValueResultingAggregatorFactory[] aFactories) {
-        this.aFactories = aFactories;
-    }
-
-    @Override
-    public IAccumulatingAggregator createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-        final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
-        for (int i = 0; i < aFactories.length; ++i) {
-            aggregators[i] = aFactories[i].createFieldValueResultingAggregator();
-        }
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        return new IAccumulatingAggregator() {
-
-            private boolean pending;
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                tb.reset();
-                for (int i = 0; i < aggregators.length; ++i) {
-                    aggregators[i].init(accessor, tIndex);
-                }
-                pending = false;
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; ++i) {
-                    aggregators[i].accumulate(accessor, tIndex);
-                }
-            }
-
-            @Override
-            public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
-                    int[] keyFieldIndexes) throws HyracksDataException {
-                if (!pending) {
-                    tb.reset();
-                    for (int i = 0; i < keyFieldIndexes.length; ++i) {
-                        tb.addField(accessor, tIndex, keyFieldIndexes[i]);
-                    }
-                    DataOutput dos = tb.getDataOutput();
-                    for (int i = 0; i < aggregators.length; ++i) {
-                        aggregators[i].output(dos);
-                        tb.addFieldEndOffset();
-                    }
-                }
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    pending = true;
-                    return false;
-                }
-                return true;
-            }
-
-        };
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
deleted file mode 100644
index 0817263..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-
-/**
- * SUM aggregator factory (for integer only; another SUM aggregator for floats
- * is available at {@link FloatSumAggregatorFactory})
- */
-public class SumAggregatorFactory implements
-        IFieldValueResultingAggregatorFactory {
-
-    private int sumField;
-
-    public SumAggregatorFactory(int field) {
-        sumField = field;
-    }
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.
-     * IFieldValueResultingAggregatorFactory
-     * #createFieldValueResultingAggregator()
-     */
-    @Override
-    public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
-        return new IFieldValueResultingAggregator() {
-            private int sum;
-
-            @Override
-            public void output(DataOutput resultAcceptor)
-                    throws HyracksDataException {
-                try {
-                    resultAcceptor.writeInt(sum);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                sum = 0;
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-            }
-        };
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.spillable.
-     * ISpillableFieldValueResultingAggregatorFactory
-     * #createFieldValueResultingAggregator()
-     */
-    @Override
-    public ISpillableFieldValueResultingAggregator createSpillableFieldValueResultingAggregator() {
-        return new ISpillableFieldValueResultingAggregator() {
-
-            private int sum;
-
-            @Override
-            public void output(DataOutput resultAcceptor)
-                    throws HyracksDataException {
-                try {
-                    resultAcceptor.writeInt(sum);
-                } catch (IOException ex) {
-                    throw new HyracksDataException(ex);
-                }
-            }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor,
-                    int tIndex, int fIndex) throws HyracksDataException {
-                sum = IntegerSerializerDeserializer.getInt(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex)
-                                + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                sum = 0;
-            }
-
-            @Override
-            public void accumulatePartialResult(IFrameTupleAccessor accessor,
-                    int tIndex, int fIndex) throws HyracksDataException {
-                sum += IntegerSerializerDeserializer.getInt(
-                        accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex)
-                                + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(tIndex, fIndex));
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, sumField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-            }
-        };
-    }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
similarity index 94%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
index 4cda69e..275bc56 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
 
 import java.io.Serializable;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 4ac72f6..07da19b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -90,11 +90,6 @@
     }
 
     @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
-
-    @Override
     public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
         if (index != 0) {
             throw new IllegalArgumentException();
@@ -121,4 +116,10 @@
         }
         return 0;
     }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index b829f5b..7bef7a9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -39,14 +39,11 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
@@ -55,7 +52,11 @@
 import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
+/**
+ *
+ */
 public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
     private static final int AGGREGATE_ACTIVITY_ID = 0;
 
     private static final int MERGE_ACTIVITY_ID = 1;
@@ -64,16 +65,22 @@
     private final int[] keyFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
     private final IAggregatorDescriptorFactory aggregatorFactory;
-    private final IAggregatorDescriptorFactory mergeFactory;
+    private final IAggregatorDescriptorFactory mergerFactory;
+
     private final int framesLimit;
     private final ISpillableTableFactory spillableTableFactory;
     private final boolean isOutputSorted;
 
-    public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
-            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergeFactory,
-            RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+    public ExternalGroupOperatorDescriptor(JobSpecification spec,
+            int[] keyFields, int framesLimit,
+            IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory,
+            IAggregatorDescriptorFactory mergerFactory,
+            RecordDescriptor recordDescriptor,
+            ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         if (framesLimit <= 1) {
@@ -81,11 +88,12 @@
              * Minimum of 2 frames: 1 for input records, and 1 for output
              * aggregation results.
              */
-            throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
+            throw new IllegalStateException(
+                    "frame limit should at least be 2, but it is "
+                            + framesLimit + "!");
         }
-
         this.aggregatorFactory = aggregatorFactory;
-        this.mergeFactory = mergeFactory;
+        this.mergerFactory = mergerFactory;
         this.keyFields = keyFields;
         this.comparatorFactories = comparatorFactories;
         this.firstNormalizerFactory = firstNormalizerFactory;
@@ -93,17 +101,25 @@
         this.isOutputSorted = isOutputSorted;
 
         /**
-         * Set the record descriptor. Note that since
-         * this operator is a unary operator,
-         * only the first record descriptor is used here.
+         * Set the record descriptor. Note that since this operator is a unary
+         * operator, only the first record descriptor is used here.
          */
         recordDescriptors[0] = recordDescriptor;
     }
 
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+     */
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
-        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
+                getOperatorId(), AGGREGATE_ACTIVITY_ID));
+        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
+                MERGE_ACTIVITY_ID));
 
         builder.addActivity(aggregateAct);
         builder.addSourceEdge(0, aggregateAct, 0);
@@ -145,28 +161,35 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider,
+                final int partition, int nPartitions)
                 throws HyracksDataException {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(
+                    ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(
+                            getOperatorId(), 0));
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private AggregateActivityState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new AggregateActivityState(ctx.getJobletContext()
+                            .getJobId(), new TaskId(getActivityId(), partition));
                     state.runs = new LinkedList<RunFileReader>();
-                    state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
+                    state.gTable = spillableTableFactory.buildSpillableTable(
+                            ctx, keyFields, comparatorFactories,
                             firstNormalizerFactory, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+                            recordDescProvider.getInputRecordDescriptor(
+                                    getOperatorId(), 0), recordDescriptors[0],
                             ExternalGroupOperatorDescriptor.this.framesLimit);
                     state.gTable.reset();
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                public void nextFrame(ByteBuffer buffer)
+                        throws HyracksDataException {
                     accessor.reset(buffer);
                     int tupleCount = accessor.getTupleCount();
                     for (int i = 0; i < tupleCount; i++) {
@@ -206,12 +229,15 @@
                 private void flushFramesToRun() throws HyracksDataException {
                     FileReference runFile;
                     try {
-                        runFile = ctx.getJobletContext().createManagedWorkspaceFile(
-                                ExternalGroupOperatorDescriptor.class.getSimpleName());
+                        runFile = ctx.getJobletContext()
+                                .createManagedWorkspaceFile(
+                                        ExternalGroupOperatorDescriptor.class
+                                                .getSimpleName());
                     } catch (IOException e) {
                         throw new HyracksDataException(e);
                     }
-                    RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+                    RunFileWriter writer = new RunFileWriter(runFile,
+                            ctx.getIOManager());
                     writer.open();
                     try {
                         state.gTable.sortFrames();
@@ -237,15 +263,28 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider,
+                final int partition, int nPartitions)
                 throws HyracksDataException {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
+                comparators[i] = comparatorFactories[i]
+                        .createBinaryComparator();
             }
-            final IAggregatorDescriptor currentWorkingAggregator = mergeFactory.createAggregator(ctx,
-                    recordDescriptors[0], recordDescriptors[0], keyFields);
+
+            int[] keyFieldsInPartialResults = new int[keyFields.length];
+            for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+                keyFieldsInPartialResults[i] = i;
+            }
+            
+            final IAggregatorDescriptor aggregator = mergerFactory
+                    .createAggregator(ctx, recordDescriptors[0],
+                            recordDescriptors[0], keyFields, keyFieldsInPartialResults);
+            final AggregateState aggregateState = aggregator
+                    .createAggregateStates();
+
             final int[] storedKeys = new int[keyFields.length];
             /**
              * Get the list of the fields in the stored records.
@@ -253,10 +292,6 @@
             for (int i = 0; i < keyFields.length; ++i) {
                 storedKeys[i] = i;
             }
-            /**
-             * Tuple builder
-             */
-            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 /**
@@ -280,15 +315,17 @@
 
                 private int[] currentFrameIndexInRun;
                 private int[] currentRunFrames;
-                private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                        recordDescriptors[0]);
-                private ArrayTupleBuilder finalTupleBuilder;
+                private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
+                        ctx.getFrameSize());
+                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
+                        ctx.getFrameSize(), recordDescriptors[0]);
                 private FrameTupleAppender writerFrameAppender;
 
                 public void initialize() throws HyracksDataException {
-                    aggState = (AggregateActivityState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
-                            AGGREGATE_ACTIVITY_ID), partition));
+                    aggState = (AggregateActivityState) ctx
+                            .getTaskState(new TaskId(new ActivityId(
+                                    getOperatorId(), AGGREGATE_ACTIVITY_ID),
+                                    partition));
                     runs = aggState.runs;
                     writer.open();
                     try {
@@ -318,11 +355,13 @@
                         writer.fail();
                         throw new HyracksDataException(e);
                     } finally {
+                        aggregateState.close();
                         writer.close();
                     }
                 }
 
-                private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+                private void doPass(LinkedList<RunFileReader> runs)
+                        throws HyracksDataException {
                     FileReference newRun = null;
                     IFrameWriter writer = this.writer;
                     boolean finalPass = false;
@@ -337,8 +376,10 @@
                         runNumber = runs.size();
                     } else {
                         runNumber = framesLimit - 2;
-                        newRun = ctx.getJobletContext().createManagedWorkspaceFile(
-                                ExternalGroupOperatorDescriptor.class.getSimpleName());
+                        newRun = ctx.getJobletContext()
+                                .createManagedWorkspaceFile(
+                                        ExternalGroupOperatorDescriptor.class
+                                                .getSimpleName());
                         writer = new RunFileWriter(newRun, ctx.getIOManager());
                         writer.open();
                     }
@@ -346,14 +387,16 @@
                         currentFrameIndexInRun = new int[runNumber];
                         currentRunFrames = new int[runNumber];
                         /**
-                         * Create file readers for each input run file, only
-                         * for the ones fit into the inFrames
+                         * Create file readers for each input run file, only for
+                         * the ones fit into the inFrames
                          */
                         RunFileReader[] runFileReaders = new RunFileReader[runNumber];
-                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
+                                .size()];
                         Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
-                                recordDescriptors[0], runNumber, comparator);
+                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
+                                ctx.getFrameSize(), recordDescriptors[0],
+                                runNumber, comparator);
                         /**
                          * current tuple index in each run
                          */
@@ -366,18 +409,25 @@
                             runFileReaders[runIndex].open();
 
                             currentRunFrames[runIndex] = 0;
-                            currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+                            currentFrameIndexInRun[runIndex] = runIndex
+                                    * runFrameLimit;
                             for (int j = 0; j < runFrameLimit; j++) {
-                                int frameIndex = currentFrameIndexInRun[runIndex] + j;
-                                if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
-                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
+                                int frameIndex = currentFrameIndexInRun[runIndex]
+                                        + j;
+                                if (runFileReaders[runIndex].nextFrame(inFrames
+                                        .get(frameIndex))) {
+                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(
+                                            ctx.getFrameSize(),
                                             recordDescriptors[0]);
-                                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                                    tupleAccessors[frameIndex].reset(inFrames
+                                            .get(frameIndex));
                                     currentRunFrames[runIndex]++;
                                     if (j == 0)
-                                        setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
+                                        setNextTopTuple(runIndex, tupleIndices,
+                                                runFileReaders, tupleAccessors,
                                                 topTuples);
                                 } else {
+                                    closeRun(runIndex, runFileReaders, tupleAccessors);
                                     break;
                                 }
                             }
@@ -395,52 +445,49 @@
                             int runIndex = topTuples.peek().getRunid();
                             FrameTupleAccessor fta = top.getAccessor();
 
-                            int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+                            int currentTupleInOutFrame = outFrameAccessor
+                                    .getTupleCount() - 1;
                             if (currentTupleInOutFrame < 0
-                                    || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+                                    || compareFrameTuples(fta, tupleIndex,
+                                            outFrameAccessor,
+                                            currentTupleInOutFrame) != 0) {
                                 /**
-                                 * If a new group comes
-                                 * Initialize the first output record
-                                 * Reset the tuple builder
+                                 * Initialize the first output record Reset the
+                                 * tuple builder
                                  */
-                                tupleBuilder.reset();
-                                for (int i = 0; i < keyFields.length; i++) {
-                                    tupleBuilder.addField(fta, tupleIndex, i);
-                                }
-
-                                currentWorkingAggregator.init(fta, tupleIndex, tupleBuilder);
-                                if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
-                                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                                if (!aggregator.initFromPartial(outFrameAppender, fta,
+                                        tupleIndex, aggregateState)) {
                                     flushOutFrame(writer, finalPass);
-                                    if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
-                                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
+                                    if (!aggregator.initFromPartial(outFrameAppender, fta,
+                                            tupleIndex, aggregateState)) {
                                         throw new HyracksDataException(
                                                 "Failed to append an aggregation result to the output frame.");
+                                    }
                                 }
+
                             } else {
                                 /**
                                  * if new tuple is in the same group of the
-                                 * current aggregator
-                                 * do merge and output to the outFrame
+                                 * current aggregator do merge and output to the
+                                 * outFrame
                                  */
-                                int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
-                                int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
-                                        keyFields.length);
-                                int fieldLength = outFrameAccessor.getFieldLength(currentTupleInOutFrame,
-                                        keyFields.length);
-                                currentWorkingAggregator.aggregate(fta, tupleIndex, outFrameAccessor.getBuffer()
-                                        .array(), tupleOffset + outFrameAccessor.getFieldSlotsLength() + fieldOffset,
-                                        fieldLength);
+
+                                aggregator.aggregate(fta, tupleIndex,
+                                        outFrameAccessor,
+                                        currentTupleInOutFrame, aggregateState);
+
                             }
                             tupleIndices[runIndex]++;
-                            setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+                            setNextTopTuple(runIndex, tupleIndices,
+                                    runFileReaders, tupleAccessors, topTuples);
                         }
 
                         if (outFrameAppender.getTupleCount() > 0) {
                             flushOutFrame(writer, finalPass);
                         }
 
-                        currentWorkingAggregator.close();
+                        aggregator.close();
+                        
                         runs.subList(0, runNumber).clear();
                         /**
                          * insert the new run file into the beginning of the run
@@ -456,36 +503,38 @@
                     }
                 }
 
-                private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
-                    if (finalTupleBuilder == null) {
-                        finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
-                    }
+                private void flushOutFrame(IFrameWriter writer, boolean isFinal)
+                        throws HyracksDataException {
                     if (writerFrame == null) {
                         writerFrame = ctx.allocateFrame();
                     }
                     if (writerFrameAppender == null) {
-                        writerFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+                        writerFrameAppender = new FrameTupleAppender(
+                                ctx.getFrameSize());
                         writerFrameAppender.reset(writerFrame, true);
                     }
                     outFrameAccessor.reset(outFrame);
-                    for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-                        finalTupleBuilder.reset();
-                        for (int j = 0; j < keyFields.length; j++) {
-                            finalTupleBuilder.addField(outFrameAccessor, i, j);
-                        }
-                        if (isFinal)
-                            currentWorkingAggregator.outputResult(outFrameAccessor, i, finalTupleBuilder);
-                        else
-                            currentWorkingAggregator.outputPartialResult(outFrameAccessor, i, finalTupleBuilder);
 
-                        if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
-                                finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                            FrameUtils.flushFrame(writerFrame, writer);
-                            writerFrameAppender.reset(writerFrame, true);
-                            if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
-                                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize()))
-                                throw new HyracksDataException(
-                                        "Failed to write final aggregation result to a writer frame!");
+                    for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+                        
+                        if(isFinal){
+                            if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                FrameUtils.flushFrame(writerFrame, writer);
+                                writerFrameAppender.reset(writerFrame, true);
+                                if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                    throw new HyracksDataException(
+                                            "Failed to write final aggregation result to a writer frame!");
+                                }
+                            }
+                        } else {
+                            if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                FrameUtils.flushFrame(writerFrame, writer);
+                                writerFrameAppender.reset(writerFrame, true);
+                                if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                    throw new HyracksDataException(
+                                            "Failed to write final aggregation result to a writer frame!");
+                                }
+                            }
                         }
                     }
                     if (writerFrameAppender.getTupleCount() > 0) {
@@ -495,12 +544,15 @@
                     outFrameAppender.reset(outFrame, true);
                 }
 
-                private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
+                private void setNextTopTuple(int runIndex, int[] tupleIndices,
+                        RunFileReader[] runCursors,
+                        FrameTupleAccessor[] tupleAccessors,
+                        ReferencedPriorityQueue topTuples)
                         throws HyracksDataException {
                     int runStart = runIndex * runFrameLimit;
                     boolean existNext = false;
-                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
+                            || runCursors[runIndex] == null) {
                         /**
                          * run already closed
                          */
@@ -510,7 +562,8 @@
                          * not the last frame for this run
                          */
                         existNext = true;
-                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
+                                .getTupleCount()) {
                             tupleIndices[runIndex] = 0;
                             currentFrameIndexInRun[runIndex]++;
                         }
@@ -533,13 +586,15 @@
                          */
                         currentRunFrames[runIndex] = 0;
                         for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
-                            ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
+                            ByteBuffer buffer = tupleAccessors[frameOffset]
+                                    .getBuffer();
                             if (runCursors[runIndex].nextFrame(buffer)) {
                                 tupleAccessors[frameOffset].reset(buffer);
                                 if (tupleAccessors[frameOffset].getTupleCount() > 0) {
                                     existNext = true;
                                 } else {
-                                    throw new IllegalStateException("illegal: empty run file");
+                                    throw new IllegalStateException(
+                                            "illegal: empty run file");
                                 }
                                 currentRunFrames[runIndex]++;
                             } else {
@@ -549,8 +604,10 @@
                     }
 
                     if (existNext) {
-                        topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
-                                tupleIndices[runIndex]);
+                        topTuples
+                                .popAndReplace(
+                                        tupleAccessors[currentFrameIndexInRun[runIndex]],
+                                        tupleIndices[runIndex]);
                     } else {
                         topTuples.pop();
                         closeRun(runIndex, runCursors, tupleAccessors);
@@ -566,23 +623,28 @@
                  * @param tupleAccessor
                  * @throws HyracksDataException
                  */
-                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+                private void closeRun(int index, RunFileReader[] runCursors,
+                        IFrameTupleAccessor[] tupleAccessor)
                         throws HyracksDataException {
                     if (runCursors[index] != null) {
                         runCursors[index].close();
                         runCursors[index] = null;
+                        tupleAccessor[index] = null;
                     }
                 }
 
-                private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+                private int compareFrameTuples(IFrameTupleAccessor fta1,
+                        int j1, IFrameTupleAccessor fta2, int j2) {
                     byte[] b1 = fta1.getBuffer().array();
                     byte[] b2 = fta2.getBuffer().array();
                     for (int f = 0; f < keyFields.length; ++f) {
                         int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                        int s1 = fta1.getTupleStartOffset(j1)
+                                + fta1.getFieldSlotsLength()
                                 + fta1.getFieldStartOffset(j1, fIdx);
                         int l1 = fta1.getFieldLength(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                        int s2 = fta2.getTupleStartOffset(j2)
+                                + fta2.getFieldSlotsLength()
                                 + fta2.getFieldStartOffset(j2, fIdx);
                         int l2_start = fta2.getFieldStartOffset(j2, fIdx);
                         int l2_end = fta2.getFieldEndOffset(j2, fIdx);
@@ -598,25 +660,32 @@
             return op;
         }
 
-        private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+        private Comparator<ReferenceEntry> createEntryComparator(
+                final IBinaryComparator[] comparators) {
             return new Comparator<ReferenceEntry>() {
 
                 @Override
                 public int compare(ReferenceEntry o1, ReferenceEntry o2) {
-                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
-                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
+                            .getAccessor();
+                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
+                            .getAccessor();
                     int j1 = o1.getTupleIndex();
                     int j2 = o2.getTupleIndex();
                     byte[] b1 = fta1.getBuffer().array();
                     byte[] b2 = fta2.getBuffer().array();
                     for (int f = 0; f < keyFields.length; ++f) {
                         int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                        int s1 = fta1.getTupleStartOffset(j1)
+                                + fta1.getFieldSlotsLength()
                                 + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                        int l1 = fta1.getFieldEndOffset(j1, fIdx)
+                                - fta1.getFieldStartOffset(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2)
+                                + fta2.getFieldSlotsLength()
                                 + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                        int l2 = fta2.getFieldEndOffset(j2, fIdx)
+                                - fta2.getFieldStartOffset(j2, fIdx);
                         int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
                         if (c != 0) {
                             return c;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index 5733c11..b1af426 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -34,7 +34,8 @@
 
 class GroupingHashTable {
     /**
-     * The pointers in the link store 3 int values for each entry in the hashtable: (bufferIdx, tIndex, accumulatorIdx).
+     * The pointers in the link store 3 int values for each entry in the
+     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
      * 
      * @author vinayakb
      */
@@ -59,53 +60,69 @@
         }
     }
 
-    private static final int INIT_ACCUMULATORS_SIZE = 8;
+    private static final int INIT_AGG_STATE_SIZE = 8;
     private final IHyracksTaskContext ctx;
     private final FrameTupleAppender appender;
     private final List<ByteBuffer> buffers;
     private final Link[] table;
-    private IAccumulatingAggregator[] accumulators;
+    /**
+     * Aggregate states: a list of states for all groups maintained in the main
+     * memory.
+     */
+    private AggregateState[] aggregateStates;
     private int accumulatorSize;
 
     private int lastBIndex;
-    private final int[] fields;
     private final int[] storedKeys;
     private final IBinaryComparator[] comparators;
     private final FrameTuplePairComparator ftpc;
     private final ITuplePartitionComputer tpc;
-    private final IAccumulatingAggregatorFactory aggregatorFactory;
-    private final RecordDescriptor inRecordDescriptor;
-    private final RecordDescriptor outRecordDescriptor;
+    private final IAggregatorDescriptor aggregator;
 
     private final FrameTupleAccessor storedKeysAccessor;
 
-    GroupingHashTable(IHyracksTaskContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
-            ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
-            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
+    GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
+            IBinaryComparatorFactory[] comparatorFactories,
+            ITuplePartitionComputerFactory tpcf,
+            IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int tableSize)
+            throws HyracksDataException {
         this.ctx = ctx;
         appender = new FrameTupleAppender(ctx.getFrameSize());
         buffers = new ArrayList<ByteBuffer>();
         table = new Link[tableSize];
-        accumulators = new IAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
-        accumulatorSize = 0;
-        this.fields = fields;
+
         storedKeys = new int[fields.length];
+        @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
         for (int i = 0; i < fields.length; ++i) {
             storedKeys[i] = i;
             storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
         }
+
         comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
         tpc = tpcf.createPartitioner();
-        this.aggregatorFactory = aggregatorFactory;
-        this.inRecordDescriptor = inRecordDescriptor;
-        this.outRecordDescriptor = outRecordDescriptor;
-        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
-        storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+
+        int[] keyFieldsInPartialResults = new int[fields.length];
+        for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+            keyFieldsInPartialResults[i] = i;
+        }
+        
+        this.aggregator = aggregatorFactory.createAggregator(ctx,
+                inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
+
+        this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
+        accumulatorSize = 0;
+
+        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
+                storedKeySerDeser);
+        storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                storedKeysRecordDescriptor);
         lastBIndex = -1;
         addNewBuffer();
     }
@@ -119,7 +136,8 @@
         ++lastBIndex;
     }
 
-    private void flushFrame(FrameTupleAppender appender, IFrameWriter writer) throws HyracksDataException {
+    private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
+            throws HyracksDataException {
         ByteBuffer frame = appender.getBuffer();
         frame.position(0);
         frame.limit(frame.capacity());
@@ -127,44 +145,56 @@
         appender.reset(appender.getBuffer(), true);
     }
 
-    void insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+    void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
         int entry = tpc.partition(accessor, tIndex, table.length);
         Link link = table[entry];
         if (link == null) {
             link = table[entry] = new Link();
         }
-        IAccumulatingAggregator aggregator = null;
+        int saIndex = -1;
         for (int i = 0; i < link.size; i += 3) {
             int sbIndex = link.pointers[i];
             int stIndex = link.pointers[i + 1];
-            int saIndex = link.pointers[i + 2];
             storedKeysAccessor.reset(buffers.get(sbIndex));
             int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
             if (c == 0) {
-                aggregator = accumulators[saIndex];
+                saIndex = link.pointers[i + 2];
                 break;
             }
         }
-        if (aggregator == null) {
+        if (saIndex < 0) {
             // Did not find the key. Insert a new entry.
-            if (!appender.appendProjection(accessor, tIndex, fields)) {
+            saIndex = accumulatorSize++;
+            // Add keys
+
+            // Add index to the keys in frame
+            int sbIndex = lastBIndex;
+            int stIndex = appender.getTupleCount();
+            
+            // Add aggregation fields
+            AggregateState newState = aggregator.createAggregateStates();
+            
+            if(!aggregator.init(appender, accessor, tIndex, newState)){
                 addNewBuffer();
-                if (!appender.appendProjection(accessor, tIndex, fields)) {
+                sbIndex = lastBIndex;
+                stIndex = appender.getTupleCount();
+                if(!aggregator.init(appender, accessor, tIndex, newState)){
                     throw new IllegalStateException();
                 }
             }
-            int sbIndex = lastBIndex;
-            int stIndex = appender.getTupleCount() - 1;
-            if (accumulatorSize >= accumulators.length) {
-                accumulators = Arrays.copyOf(accumulators, accumulators.length * 2);
+            
+            if (accumulatorSize >= aggregateStates.length) {
+                aggregateStates = Arrays.copyOf(aggregateStates,
+                        aggregateStates.length * 2);
             }
-            int saIndex = accumulatorSize++;
-            aggregator = accumulators[saIndex] = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
-                    outRecordDescriptor);
-            aggregator.init(accessor, tIndex);
+
+            aggregateStates[saIndex] = newState;
+
             link.add(sbIndex, stIndex, saIndex);
+        } else {
+            aggregator.aggregate(accessor, tIndex, null, 0,
+                    aggregateStates[saIndex]);
         }
-        aggregator.accumulate(accessor, tIndex);
     }
 
     void write(IFrameWriter writer) throws HyracksDataException {
@@ -179,8 +209,10 @@
                     int aIndex = link.pointers[j + 2];
                     ByteBuffer keyBuffer = buffers.get(bIndex);
                     storedKeysAccessor.reset(keyBuffer);
-                    IAccumulatingAggregator aggregator = accumulators[aIndex];
-                    while (!aggregator.output(appender, storedKeysAccessor, tIndex, storedKeys)) {
+
+                    while (!aggregator
+                            .outputFinalResult(appender, storedKeysAccessor,
+                                    tIndex, aggregateStates[aIndex])) {
                         flushFrame(appender, writer);
                     }
                 }
@@ -190,4 +222,10 @@
             flushFrame(appender, writer);
         }
     }
+    
+    void close() throws HyracksDataException {
+        for(AggregateState aState : aggregateStates){
+            aState.close();
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index d0cd895..c089dd4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -38,7 +38,11 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
+/**
+ *
+ */
 public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
     private static final int HASH_BUILD_ACTIVITY_ID = 0;
 
     private static final int OUTPUT_ACTIVITY_ID = 1;
@@ -48,27 +52,40 @@
     private final int[] keys;
     private final ITuplePartitionComputerFactory tpcf;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IAccumulatingAggregatorFactory aggregatorFactory;
+
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+
     private final int tableSize;
 
-    public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys, ITuplePartitionComputerFactory tpcf,
-            IBinaryComparatorFactory[] comparatorFactories, IAccumulatingAggregatorFactory aggregatorFactory,
-            RecordDescriptor recordDescriptor, int tableSize) {
+    public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
+            ITuplePartitionComputerFactory tpcf,
+            IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor outRecordDescriptor, int tableSize) {
         super(spec, 1, 1);
         this.keys = keys;
         this.tpcf = tpcf;
         this.comparatorFactories = comparatorFactories;
         this.aggregatorFactory = aggregatorFactory;
-        recordDescriptors[0] = recordDescriptor;
+        recordDescriptors[0] = outRecordDescriptor;
         this.tableSize = tableSize;
     }
 
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
+     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
+     */
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
+        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
+                HASH_BUILD_ACTIVITY_ID));
         builder.addActivity(ha);
 
-        OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
+        OutputActivity oa = new OutputActivity(new ActivityId(odId,
+                OUTPUT_ACTIVITY_ID));
         builder.addActivity(oa);
 
         builder.addSourceEdge(0, ha, 0);
@@ -105,28 +122,40 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider,
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider,
                 final int partition, int nPartitions) {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(
+                    ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(
+                            getOperatorId(), 0));
             return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildActivityState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
-                    state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+                    state = new HashBuildActivityState(ctx.getJobletContext()
+                            .getJobId(), new TaskId(getActivityId(), partition));
+                    state.table = new GroupingHashTable(ctx, keys,
+                            comparatorFactories, tpcf, aggregatorFactory,
+                            recordDescProvider.getInputRecordDescriptor(
+                                    getOperatorId(), 0), recordDescriptors[0],
                             tableSize);
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                public void nextFrame(ByteBuffer buffer)
+                        throws HyracksDataException {
                     accessor.reset(buffer);
                     int tupleCount = accessor.getTupleCount();
                     for (int i = 0; i < tupleCount; ++i) {
-                        state.table.insert(accessor, i);
+                        try {
+                            state.table.insert(accessor, i);
+                        } catch (Exception e) {
+                            System.out.println(e.toString());
+                            throw new HyracksDataException(e);
+                        }
                     }
                 }
 
@@ -137,6 +166,8 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    throw new HyracksDataException(
+                            "HashGroupOperator is failed.");
                 }
             };
         }
@@ -150,13 +181,17 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+        public IOperatorNodePushable createPushRuntime(
+                final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider,
                 final int partition, int nPartitions) {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    HashBuildActivityState buildState = (HashBuildActivityState) ctx.getTaskState(new TaskId(
-                            new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
+                    HashBuildActivityState buildState = (HashBuildActivityState) ctx
+                            .getTaskState(new TaskId(new ActivityId(
+                                    getOperatorId(), HASH_BUILD_ACTIVITY_ID),
+                                    partition));
                     GroupingHashTable table = buildState.table;
                     writer.open();
                     try {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
deleted file mode 100644
index b20a93d..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
-import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
-import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-
-public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
-    private static final long serialVersionUID = 1L;
-    private final ITuplePartitionComputerFactory tpcf;
-    private final int tableSize;
-
-    public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
-        this.tpcf = tpcf;
-        this.tableSize = tableSize;
-    }
-
-    @Override
-    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, final int[] keyFields,
-            final IBinaryComparatorFactory[] comparatorFactories,
-            final INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-            final IAggregatorDescriptorFactory aggregateDescriptorFactory, final RecordDescriptor inRecordDescriptor,
-            final RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
-        final int[] storedKeys = new int[keyFields.length];
-        @SuppressWarnings("rawtypes")
-        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
-        for (int i = 0; i < keyFields.length; i++) {
-            storedKeys[i] = i;
-            storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
-        }
-
-        RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
-        final FrameTupleAccessor storedKeysAccessor1;
-        final FrameTupleAccessor storedKeysAccessor2;
-        if (keyFields.length >= outRecordDescriptor.getFields().length) {
-            // for the case of zero-aggregations
-            ISerializerDeserializer<?>[] fields = outRecordDescriptor.getFields();
-            ITypeTrait[] types = outRecordDescriptor.getTypeTraits();
-            ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
-            for (int i = 0; i < fields.length; i++)
-                newFields[i] = fields[i];
-            ITypeTrait[] newTypes = null;
-            if (types != null) {
-                newTypes = new ITypeTrait[types.length + 1];
-                for (int i = 0; i < types.length; i++)
-                    newTypes[i] = types[i];
-            }
-            internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
-        }
-        storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
-        storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
-
-        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-
-        final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);
-        final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);
-        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-        final ITuplePartitionComputer tpc = tpcf.createPartitioner();
-        final ByteBuffer outFrame = ctx.allocateFrame();
-
-        final ArrayTupleBuilder internalTupleBuilder;
-        if (keyFields.length < outRecordDescriptor.getFields().length)
-            internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        else
-            internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
-        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
-                .createNormalizedKeyComputer();
-
-        return new ISpillableTable() {
-            private int dataFrameCount;
-            private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);;
-            private final TuplePointer storedTuplePointer = new TuplePointer();
-            private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
-            private IAggregatorDescriptor aggregator = aggregateDescriptorFactory.createAggregator(ctx,
-                    inRecordDescriptor, outRecordDescriptor, keyFields);
-
-            /**
-             * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
-             * = Frame index in the "Frames" list, [1] = Tuple index in the
-             * frame, [2] = Poor man's normalized key for the tuple.
-             */
-            private int[] tPointers;
-
-            @Override
-            public void reset() {
-                dataFrameCount = -1;
-                tPointers = null;
-                table.reset();
-                aggregator.close();
-            }
-
-            @Override
-            public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                if (dataFrameCount < 0)
-                    nextAvailableFrame();
-                int entry = tpc.partition(accessor, tIndex, tableSize);
-                boolean foundGroup = false;
-                int offset = 0;
-                do {
-                    table.getTuplePointer(entry, offset++, storedTuplePointer);
-                    if (storedTuplePointer.frameIndex < 0)
-                        break;
-                    storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
-                    int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
-                    if (c == 0) {
-                        foundGroup = true;
-                        break;
-                    }
-                } while (true);
-
-                if (!foundGroup) {
-                    /**
-                     * If no matching group is found, create a new aggregator
-                     * Create a tuple for the new group
-                     */
-                    internalTupleBuilder.reset();
-                    for (int i = 0; i < keyFields.length; i++) {
-                        internalTupleBuilder.addField(accessor, tIndex, keyFields[i]);
-                    }
-                    aggregator.init(accessor, tIndex, internalTupleBuilder);
-                    if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
-                            internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
-                        if (!nextAvailableFrame()) {
-                            return false;
-                        } else {
-                            if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
-                                    internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
-                                throw new IllegalStateException("Failed to init an aggregator");
-                            }
-                        }
-                    }
-
-                    storedTuplePointer.frameIndex = dataFrameCount;
-                    storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
-                    table.insert(entry, storedTuplePointer);
-                } else {
-                    // If there is a matching found, do aggregation directly
-                    int tupleOffset = storedKeysAccessor1.getTupleStartOffset(storedTuplePointer.tupleIndex);
-                    int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(storedTuplePointer.tupleIndex,
-                            keyFields.length);
-                    int tupleLength = storedKeysAccessor1.getFieldLength(storedTuplePointer.tupleIndex,
-                            keyFields.length);
-                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset
-                            + storedKeysAccessor1.getFieldSlotsLength() + aggFieldOffset, tupleLength);
-                }
-                return true;
-            }
-
-            @Override
-            public List<ByteBuffer> getFrames() {
-                return frames;
-            }
-
-            @Override
-            public int getFrameCount() {
-                return dataFrameCount;
-            }
-
-            @Override
-            public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
-                FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                writer.open();
-                appender.reset(outFrame, true);
-                if (tPointers == null) {
-                    // Not sorted
-                    for (int i = 0; i < tableSize; ++i) {
-                        int entry = i;
-                        int offset = 0;
-                        do {
-                            table.getTuplePointer(entry, offset++, storedTuplePointer);
-                            if (storedTuplePointer.frameIndex < 0)
-                                break;
-                            int bIndex = storedTuplePointer.frameIndex;
-                            int tIndex = storedTuplePointer.tupleIndex;
-                            storedKeysAccessor1.reset(frames.get(bIndex));
-                            // Reset the tuple for the partial result
-                            outputTupleBuilder.reset();
-                            for (int k = 0; k < keyFields.length; k++) {
-                                outputTupleBuilder.addField(storedKeysAccessor1, tIndex, k);
-                            }
-                            if (isPartial)
-                                aggregator.outputPartialResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
-                            else
-                                aggregator.outputResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
-                            while (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
-                                    outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                                FrameUtils.flushFrame(outFrame, writer);
-                                appender.reset(outFrame, true);
-                            }
-                        } while (true);
-                    }
-                    if (appender.getTupleCount() != 0) {
-                        FrameUtils.flushFrame(outFrame, writer);
-                    }
-                    aggregator.close();
-                    return;
-                }
-                int n = tPointers.length / 3;
-                for (int ptr = 0; ptr < n; ptr++) {
-                    int tableIndex = tPointers[ptr * 3];
-                    int rowIndex = tPointers[ptr * 3 + 1];
-                    table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
-                    int frameIndex = storedTuplePointer.frameIndex;
-                    int tupleIndex = storedTuplePointer.tupleIndex;
-                    // Get the frame containing the value
-                    ByteBuffer buffer = frames.get(frameIndex);
-                    storedKeysAccessor1.reset(buffer);
-
-                    outputTupleBuilder.reset();
-                    for (int k = 0; k < keyFields.length; k++) {
-                        outputTupleBuilder.addField(storedKeysAccessor1, tupleIndex, k);
-                    }
-                    if (isPartial)
-                        aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
-                    else
-                        aggregator.outputResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
-                    if (!appender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
-                            outputTupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outFrame, writer);
-                        appender.reset(outFrame, true);
-                        if (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
-                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                            throw new IllegalStateException();
-                        }
-                    }
-                }
-                if (appender.getTupleCount() > 0) {
-                    FrameUtils.flushFrame(outFrame, writer);
-                }
-                aggregator.close();
-            }
-
-            /**
-             * Set the working frame to the next available frame in the frame
-             * list. There are two cases:<br>
-             * 1) If the next frame is not initialized, allocate a new frame. 2)
-             * When frames are already created, they are recycled.
-             * 
-             * @return Whether a new frame is added successfully.
-             */
-            private boolean nextAvailableFrame() {
-                // Return false if the number of frames is equal to the limit.
-                if (dataFrameCount + 1 >= framesLimit)
-                    return false;
-
-                if (frames.size() < framesLimit) {
-                    // Insert a new frame
-                    ByteBuffer frame = ctx.allocateFrame();
-                    frame.position(0);
-                    frame.limit(frame.capacity());
-                    frames.add(frame);
-                    appender.reset(frame, true);
-                    dataFrameCount = frames.size() - 1;
-                } else {
-                    // Reuse an old frame
-                    dataFrameCount++;
-                    ByteBuffer frame = frames.get(dataFrameCount);
-                    frame.position(0);
-                    frame.limit(frame.capacity());
-                    appender.reset(frame, true);
-                }
-                return true;
-            }
-
-            @Override
-            public void sortFrames() {
-                int sfIdx = storedKeys[0];
-                int totalTCount = table.getTupleCount();
-                tPointers = new int[totalTCount * 3];
-                int ptr = 0;
-
-                for (int i = 0; i < tableSize; i++) {
-                    int entry = i;
-                    int offset = 0;
-                    do {
-                        table.getTuplePointer(entry, offset, storedTuplePointer);
-                        if (storedTuplePointer.frameIndex < 0)
-                            break;
-                        tPointers[ptr * 3] = entry;
-                        tPointers[ptr * 3 + 1] = offset;
-                        table.getTuplePointer(entry, offset, storedTuplePointer);
-                        int fIndex = storedTuplePointer.frameIndex;
-                        int tIndex = storedTuplePointer.tupleIndex;
-                        storedKeysAccessor1.reset(frames.get(fIndex));
-                        int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
-                        int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
-                        int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
-                        int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
-                        tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
-                                .array(), f0Start, f0EndRel - f0StartRel);
-                        ptr++;
-                        offset++;
-                    } while (true);
-                }
-                /**
-                 * Sort using quick sort
-                 */
-                if (tPointers.length > 0) {
-                    sort(tPointers, 0, totalTCount);
-                }
-            }
-
-            private void sort(int[] tPointers, int offset, int length) {
-                int m = offset + (length >> 1);
-                int mTable = tPointers[m * 3];
-                int mRow = tPointers[m * 3 + 1];
-                int mNormKey = tPointers[m * 3 + 2];
-
-                table.getTuplePointer(mTable, mRow, storedTuplePointer);
-                int mFrame = storedTuplePointer.frameIndex;
-                int mTuple = storedTuplePointer.tupleIndex;
-                storedKeysAccessor1.reset(frames.get(mFrame));
-
-                int a = offset;
-                int b = a;
-                int c = offset + length - 1;
-                int d = c;
-                while (true) {
-                    while (b <= c) {
-                        int bTable = tPointers[b * 3];
-                        int bRow = tPointers[b * 3 + 1];
-                        int bNormKey = tPointers[b * 3 + 2];
-                        int cmp = 0;
-                        if (bNormKey != mNormKey) {
-                            cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
-                        } else {
-                            table.getTuplePointer(bTable, bRow, storedTuplePointer);
-                            int bFrame = storedTuplePointer.frameIndex;
-                            int bTuple = storedTuplePointer.tupleIndex;
-                            storedKeysAccessor2.reset(frames.get(bFrame));
-                            cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
-                        }
-                        if (cmp > 0) {
-                            break;
-                        }
-                        if (cmp == 0) {
-                            swap(tPointers, a++, b);
-                        }
-                        ++b;
-                    }
-                    while (c >= b) {
-                        int cTable = tPointers[c * 3];
-                        int cRow = tPointers[c * 3 + 1];
-                        int cNormKey = tPointers[c * 3 + 2];
-                        int cmp = 0;
-                        if (cNormKey != mNormKey) {
-                            cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
-                        } else {
-                            table.getTuplePointer(cTable, cRow, storedTuplePointer);
-                            int cFrame = storedTuplePointer.frameIndex;
-                            int cTuple = storedTuplePointer.tupleIndex;
-                            storedKeysAccessor2.reset(frames.get(cFrame));
-                            cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
-                        }
-                        if (cmp < 0) {
-                            break;
-                        }
-                        if (cmp == 0) {
-                            swap(tPointers, c, d--);
-                        }
-                        --c;
-                    }
-                    if (b > c)
-                        break;
-                    swap(tPointers, b++, c--);
-                }
-
-                int s;
-                int n = offset + length;
-                s = Math.min(a - offset, b - a);
-                vecswap(tPointers, offset, b - s, s);
-                s = Math.min(d - c, n - d - 1);
-                vecswap(tPointers, b, n - s, s);
-
-                if ((s = b - a) > 1) {
-                    sort(tPointers, offset, s);
-                }
-                if ((s = d - c) > 1) {
-                    sort(tPointers, n - s, s);
-                }
-            }
-
-            private void swap(int x[], int a, int b) {
-                for (int i = 0; i < 3; ++i) {
-                    int t = x[a * 3 + i];
-                    x[a * 3 + i] = x[b * 3 + i];
-                    x[b * 3 + i] = t;
-                }
-            }
-
-            private void vecswap(int x[], int a, int b, int n) {
-                for (int i = 0; i < n; i++, a++, b++) {
-                    swap(x, a, b);
-                }
-            }
-
-            @Override
-            public void close() {
-                dataFrameCount = -1;
-                tPointers = null;
-                table.close();
-                frames.clear();
-            }
-        };
-    }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
similarity index 99%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index b20197d..af68afb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java
deleted file mode 100644
index b0da898..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-public interface IAccumulatingAggregator {
-    /**
-     * Called once per aggregator before calling accumulate for the first time.
-     * 
-     * @param accessor
-     *            - Accessor to the data tuple.
-     * @param tIndex
-     *            - Index of the tuple in the accessor.
-     * @throws HyracksDataException
-     */
-    public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
-    /**
-     * Called once per tuple that belongs to this group.
-     * 
-     * @param accessor
-     *            - Accessor to data tuple.
-     * @param tIndex
-     *            - Index of tuple in the accessor.
-     * @throws HyracksDataException
-     */
-    public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
-
-    /**
-     * Called finally to emit output. This method is called until it returns true. The method is free to
-     * write out output to the provided appender until there is no more space and return false. It is the
-     * caller's responsibility to flush and make room in the appender before this method is called again.
-     * 
-     * @param appender
-     *            - Appender to write output to.
-     * @param accessor
-     *            - Accessor to access the key.
-     * @param tIndex
-     *            - Tuple index of the key in the accessor.
-     * @param keyFieldIndexes
-     *            - Field indexes of the key field.
-     * @return true if all output is written, false if the appender is full.
-     * @throws HyracksDataException
-     */
-    public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
-            throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
deleted file mode 100644
index 978f671..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IAccumulatingAggregatorFactory extends Serializable {
-    IAccumulatingAggregator createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor) throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
similarity index 94%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
index 3851f26..858c760 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
 
 import java.io.Serializable;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
similarity index 98%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index dc2a30e..4167de1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index a0802c8..339c29f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
 
 import java.io.Serializable;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
similarity index 98%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index c4df536..c42d29a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
 
 import java.io.DataOutput;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
index 0e2d530..ee35c5e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations;
+package edu.uci.ics.hyracks.dataflow.std.group;
 
 import java.io.Serializable;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
index 7bd8cd8..ff80a23 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -21,9 +21,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
-/**
- * @author jarodwen
- */
 public interface ISpillableTable {
 
     public void close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
index 0ff1d1d..de9fac5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -21,15 +21,11 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
 
-/**
- * @author jarodwen
- */
 public interface ISpillableTableFactory extends Serializable {
     ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
             IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+            IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index c678ac1..8ff4942 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -29,15 +29,17 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
-public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class PreclusteredGroupOperatorDescriptor extends
+        AbstractSingleActivityOperatorDescriptor {
     private final int[] groupFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IAccumulatingAggregatorFactory aggregatorFactory;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
 
     private static final long serialVersionUID = 1L;
 
-    public PreclusteredGroupOperatorDescriptor(JobSpecification spec, int[] groupFields,
-            IBinaryComparatorFactory[] comparatorFactories, IAccumulatingAggregatorFactory aggregatorFactory,
+    public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
+            int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.groupFields = groupFields;
@@ -47,32 +49,40 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) throws HyracksDataException {
+    public IOperatorNodePushable createPushRuntime(
+            final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition,
+            int nPartitions) throws HyracksDataException {
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-        final IAccumulatingAggregator aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
-                recordDescriptors[0]);
+        final RecordDescriptor inRecordDesc = recordDescProvider
+                .getInputRecordDescriptor(getOperatorId(), 0);
+        final IAggregatorDescriptor aggregator = aggregatorFactory
+                .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
+                        groupFields, groupFields);
         final ByteBuffer copyFrame = ctx.allocateFrame();
-        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
+                ctx.getFrameSize(), inRecordDesc);
         copyFrameAccessor.reset(copyFrame);
         ByteBuffer outFrame = ctx.allocateFrame();
-        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        final FrameTupleAppender appender = new FrameTupleAppender(
+                ctx.getFrameSize());
         appender.reset(outFrame, true);
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             private PreclusteredGroupWriter pgw;
 
             @Override
             public void open() throws HyracksDataException {
-                pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc, writer);
+                pgw = new PreclusteredGroupWriter(ctx, groupFields,
+                        comparators, aggregator, inRecordDesc, writer);
                 pgw.open();
             }
 
             @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            public void nextFrame(ByteBuffer buffer)
+                    throws HyracksDataException {
                 pgw.nextFrame(buffer);
             }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index df7f9f9..3f2efaf 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -28,7 +28,8 @@
 public class PreclusteredGroupWriter implements IFrameWriter {
     private final int[] groupFields;
     private final IBinaryComparator[] comparators;
-    private final IAccumulatingAggregator aggregator;
+    private final IAggregatorDescriptor aggregator;
+    private final AggregateState aggregateState;
     private final IFrameWriter writer;
     private final ByteBuffer copyFrame;
     private final FrameTupleAccessor inFrameAccessor;
@@ -38,10 +39,11 @@
     private boolean first;
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
-            IAccumulatingAggregator aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
         this.groupFields = groupFields;
         this.comparators = comparators;
         this.aggregator = aggregator;
+        this.aggregateState = aggregator.createAggregateStates();
         this.writer = writer;
         copyFrame = ctx.allocateFrame();
         inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
@@ -64,7 +66,7 @@
         int nTuples = inFrameAccessor.getTupleCount();
         for (int i = 0; i < nTuples; ++i) {
             if (first) {
-                aggregator.init(inFrameAccessor, i);
+                aggregator.init(null, inFrameAccessor, i, aggregateState);
                 first = false;
             } else {
                 if (i == 0) {
@@ -72,8 +74,8 @@
                 } else {
                     switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
                 }
+                
             }
-            aggregator.accumulate(inFrameAccessor, i);
         }
         FrameUtils.copy(buffer, copyFrame);
     }
@@ -82,16 +84,18 @@
             FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
         if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
             writeOutput(prevTupleAccessor, prevTupleIndex);
-            aggregator.init(currTupleAccessor, currTupleIndex);
+            aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+        } else {
+            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
         }
     }
 
     private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
             throws HyracksDataException {
-        if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+        if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
             FrameUtils.flushFrame(appender.getBuffer(), writer);
             appender.reset(appender.getBuffer(), true);
-            if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+            if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
                 throw new IllegalStateException();
             }
         }
@@ -124,6 +128,7 @@
                 FrameUtils.flushFrame(appender.getBuffer(), writer);
             }
         }
+        aggregateState.close();
         writer.close();
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
index 3ecb968..d2d1b03 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -23,10 +23,10 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
 /**
  *
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
similarity index 93%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index a56c669..657af3b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -23,10 +23,10 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
 /**
  *
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
similarity index 94%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 0695c5a..6045687 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -23,10 +23,10 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
 /**
  *
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
similarity index 96%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 1ae7aa4..3970e57 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -26,10 +26,10 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
 /**
  *
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
similarity index 95%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 0ae62e5..11d236c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
 
 import java.io.DataOutput;
 
@@ -22,12 +22,12 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
 /**
  *
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index d8df3a8..753cf0c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -38,16 +38,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MinMaxStringFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -58,6 +48,16 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 
 /**
  *
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index c0a1d10..1c96755 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -35,9 +35,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -47,7 +44,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 
@@ -78,7 +78,7 @@
                 spec,
                 new int[] { 0 },
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID);
 
@@ -89,8 +89,8 @@
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -147,7 +147,7 @@
                 spec,
                 new int[] { 0 },
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
@@ -158,8 +158,8 @@
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -216,7 +216,7 @@
                 spec,
                 new int[] { 0 },
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
@@ -227,8 +227,8 @@
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
deleted file mode 100644
index a134a39..0000000
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.tests.spillable;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.AvgAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.ConcatAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-
-/**
- * 
- */
-public class ExternalAggregateTest extends AbstractIntegrationTest {
-
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                    "data/tpch0.001/lineitem.tbl"))) });
-
-    static final boolean isOutputFile = true;
-
-    final RecordDescriptor desc = new RecordDescriptor(
-            new ISerializerDeserializer[] {
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE });
-
-    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
-            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE, }, '|');
-
-    private AbstractSingleActivityOperatorDescriptor getPrinter(
-            JobSpecification spec, boolean isFile, String prefix)
-            throws IOException {
-        AbstractSingleActivityOperatorDescriptor printer;
-
-        if (!isOutputFile)
-            printer = new PrinterOperatorDescriptor(spec);
-        else
-            printer = new PlainFileWriterOperatorDescriptor(spec,
-                    new ConstantFileSplitProvider(new FileSplit[] {
-                            new FileSplit(NC1_ID, createTempFile()
-                                    .getAbsolutePath()),
-                            new FileSplit(NC2_ID, createTempFile()
-                                    .getAbsolutePath()) }), "\t");
-
-        return printer;
-    }
-
-    @Test
-    public void hashSingleKeyScalarGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0 };
-        int frameLimits = 3;
-        int tableSize = 8;
-
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new CountAggregatorDescriptorFactory(),
-                new IntSumAggregatorDescriptorFactory(keyFields.length),
-                outputRec,
-                new HashSpillableGroupingTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                isOutputFile, "hashSingleKeyScalarGroupTest");
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void hashMultipleKeyScalarGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE, });
-
-        int[] keyFields = new int[] { 0, 9 };
-        int frameLimits = 3;
-        int tableSize = 8;
-
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new IntSumAggregatorDescriptorFactory(1),
-                new IntSumAggregatorDescriptorFactory(keyFields.length),
-                outputRec,
-                new HashSpillableGroupingTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                isOutputFile, "hashMultipleKeyScalarGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void hashMultipleKeyMultipleScalarGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE, });
-
-        int[] keyFields = new int[] { 0, 9 };
-        int frameLimits = 3;
-        int tableSize = 8;
-
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiAggregatorDescriptorFactory(
-                        new IAggregatorDescriptorFactory[] {
-                                new IntSumAggregatorDescriptorFactory(1, 2),
-                                new IntSumAggregatorDescriptorFactory(2, 3) }),
-                new MultiAggregatorDescriptorFactory(
-                        new IAggregatorDescriptorFactory[] {
-                                new IntSumAggregatorDescriptorFactory(2, 2),
-                                new IntSumAggregatorDescriptorFactory(3, 3) }),
-                outputRec,
-                new HashSpillableGroupingTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                isOutputFile, "hashMultipleKeyMultipleScalarGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void hashMultipleKeyNonScalarGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0 };
-        int frameLimits = 3;
-        int tableSize = 8;
-
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new ConcatAggregatorDescriptorFactory(9),
-                new ConcatAggregatorDescriptorFactory(keyFields.length),
-                outputRec,
-                new HashSpillableGroupingTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                isOutputFile, "hashMultipleKeyNonScalarGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0, 9 };
-        int frameLimits = 3;
-        int tableSize = 8;
-
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiAggregatorDescriptorFactory(
-                        new IAggregatorDescriptorFactory[] {
-                                new IntSumAggregatorDescriptorFactory(1, 2),
-                                new IntSumAggregatorDescriptorFactory(2, 3),
-                                new ConcatAggregatorDescriptorFactory(9, 4) }),
-                new MultiAggregatorDescriptorFactory(
-                        new IAggregatorDescriptorFactory[] {
-                                new IntSumAggregatorDescriptorFactory(2, 2),
-                                new IntSumAggregatorDescriptorFactory(3, 3),
-                                new ConcatAggregatorDescriptorFactory(4, 4) }),
-                outputRec,
-                new HashSpillableGroupingTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                isOutputFile, "hashMultipleKeyMultipleFieldsGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    @Test
-    public void hashSingleKeyScalarAvgGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0 };
-        int frameLimits = 3;
-        int tableSize = 8;
-
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new AvgAggregatorDescriptorFactory(1),
-                new AvgAggregatorDescriptorFactory(keyFields.length),
-                outputRec,
-                new HashSpillableGroupingTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                isOutputFile, "hashSingleKeyScalarGroupTest");
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-}
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 61b4ad8..892ae39 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -42,18 +42,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -67,8 +55,12 @@
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 
 /**
@@ -237,97 +229,8 @@
         AbstractOperatorDescriptor grouper;
 
         switch (alg) {
-        case 0: // External hash group
-            grouper = new ExternalGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    framesLimit,
-                    new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE },
-                    new IntegerNormalizedKeyComputerFactory(),
-                    new MultiAggregatorDescriptorFactory(
-                            new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
-                    new MultiAggregatorDescriptorFactory(
-                            new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
-                                    keys.length) }),
-                    outDesc,
-                    new HashSpillableGroupingTableFactory(
-                            new FieldHashPartitionComputerFactory(keys,
-                                    new IBinaryHashFunctionFactory[] {
-                                    // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                    IntegerBinaryHashFunctionFactory.INSTANCE }),
-                            htSize), false);
-
-            createPartitionConstraint(spec, grouper, outSplits);
-
-            // Connect scanner with the grouper
-            IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(
-                    spec, new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }));
-            spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
-            break;
-        case 1: // External sort + pre-cluster
-            ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                    spec, framesLimit, keys, new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
-            createPartitionConstraint(spec, sorter, inSplits);
-
-            // Connect scan operator with the sorter
-            IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(
-                    spec, new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }));
-            spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
-
-            grouper = new PreclusteredGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE },
-                    new MultiAggregatorFactory(
-                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-                    outDesc);
-
-            createPartitionConstraint(spec, grouper, outSplits);
-
-            // Connect sorter with the pre-cluster
-            OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(
-                    spec);
-            spec.connect(sortGroupConn, sorter, 0, grouper, 0);
-            break;
-        case 2: // In-memory hash group
-            grouper = new HashGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }),
-                    new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE },
-                    new MultiAggregatorFactory(
-                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-                    outDesc, htSize);
-
-            createPartitionConstraint(spec, grouper, outSplits);
-
-            // Connect scanner with the grouper
-            IConnectorDescriptor scanConn = new MToNPartitioningConnectorDescriptor(
-                    spec, new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }));
-            spec.connect(scanConn, fileScanner, 0, grouper, 0);
-            break;
-        case 3: // new external hash graph
-            grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.ExternalGroupOperatorDescriptor(
+        case 0: // new external hash graph
+            grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(
                     spec,
                     keys,
                     framesLimit,
@@ -358,7 +261,7 @@
             spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
             
             break;
-        case 4: // External-sort + new-precluster
+        case 1: // External-sort + new-precluster
             ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(
                     spec, framesLimit, keys, new IBinaryComparatorFactory[] {
                     // IntegerBinaryComparatorFactory.INSTANCE,
@@ -373,7 +276,7 @@
                             IntegerBinaryHashFunctionFactory.INSTANCE }));
             spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
 
-            grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor(
+            grouper = new edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor(
                     spec,
                     keys,
                     new IBinaryComparatorFactory[] {
@@ -390,8 +293,8 @@
                     spec);
             spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
             break;
-        case 5: // Inmem
-            grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor(
+        case 2: // Inmem
+            grouper = new HashGroupOperatorDescriptor(
                     spec,
                     keys,
                     new FieldHashPartitionComputerFactory(keys,
@@ -416,7 +319,7 @@
             spec.connect(scanConn2, fileScanner, 0, grouper, 0);
             break;
         default:
-            grouper = new ExternalGroupOperatorDescriptor(
+            grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(
                     spec,
                     keys,
                     framesLimit,
@@ -424,19 +327,18 @@
                     // IntegerBinaryComparatorFactory.INSTANCE,
                     IntegerBinaryComparatorFactory.INSTANCE },
                     new IntegerNormalizedKeyComputerFactory(),
-                    new MultiAggregatorDescriptorFactory(
-                            new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
-                    new MultiAggregatorDescriptorFactory(
-                            new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
-                                    keys.length) }),
-                    outDesc,
-                    new HashSpillableGroupingTableFactory(
-                            new FieldHashPartitionComputerFactory(keys,
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(
+                                    false) }),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(
+                                    keys.length, false) }), outDesc,
+                    new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keys,
                                     new IBinaryHashFunctionFactory[] {
                                     // IntegerBinaryHashFunctionFactory.INSTANCE,
                                     IntegerBinaryHashFunctionFactory.INSTANCE }),
                             htSize), false);
-
+            
             createPartitionConstraint(spec, grouper, outSplits);
 
             // Connect scanner with the grouper
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 0fa6424..8cac822 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -39,9 +39,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -51,7 +48,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 import edu.uci.ics.hyracks.examples.text.WordTupleParserFactory;
@@ -143,8 +143,8 @@
             gBy = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
                     new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
                     new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                    new MultiAggregatorFactory(
-                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                     groupResultDesc, htSize);
             createPartitionConstraint(spec, gBy, outSplits);
             IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -166,8 +166,8 @@
 
             gBy = new PreclusteredGroupOperatorDescriptor(spec, keys,
                     new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                    new MultiAggregatorFactory(
-                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                     groupResultDesc);
             createPartitionConstraint(spec, gBy, outSplits);
             OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index a43aafb..9e88f8c 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -45,9 +45,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -57,6 +54,9 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
@@ -316,8 +316,8 @@
                             new int[] { 6 },
                             new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
                     new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                    new MultiAggregatorFactory(
-                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                     groupResultDesc, 16);
             createPartitionConstraint(spec, gby, resultSplits);