Update issue #52: 

- rewrote the aggregator interface to create a state factory; 
- added a wrapper interface for aggregation and changed the original aggregators to be field aggregators, and added a multi-field aggregator wrapper;
- rewrote test cases for new interface; 
- added count field aggregator.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@879 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
index f9c3b90..f273c37 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
@@ -37,9 +37,5 @@
     public Object getState() {
         return state;
     }
-
-    public int getLength() {
-        return -1;
-    }
     
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index ba538f7..680e7cb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -67,8 +67,9 @@
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final INormalizedKeyComputerFactory firstNormalizerFactory;
 
-    private final IFieldAggregateDescriptorFactory[] aggregateFactories;
-    private final IFieldAggregateDescriptorFactory[] mergeFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final IAggregatorDescriptorFactory mergerFactory;
+
     private final int framesLimit;
     private final ISpillableTableFactory spillableTableFactory;
     private final boolean isOutputSorted;
@@ -77,8 +78,8 @@
             int[] keyFields, int framesLimit,
             IBinaryComparatorFactory[] comparatorFactories,
             INormalizedKeyComputerFactory firstNormalizerFactory,
-            IFieldAggregateDescriptorFactory[] aggregateFactories,
-            IFieldAggregateDescriptorFactory[] mergeFactories,
+            IAggregatorDescriptorFactory aggregatorFactory,
+            IAggregatorDescriptorFactory mergerFactory,
             RecordDescriptor recordDescriptor,
             ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
         super(spec, 1, 1);
@@ -92,8 +93,8 @@
                     "frame limit should at least be 2, but it is "
                             + framesLimit + "!");
         }
-        this.aggregateFactories = aggregateFactories;
-        this.mergeFactories = mergeFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.mergerFactory = mergerFactory;
         this.keyFields = keyFields;
         this.comparatorFactories = comparatorFactories;
         this.firstNormalizerFactory = firstNormalizerFactory;
@@ -180,7 +181,7 @@
                     state.runs = new LinkedList<RunFileReader>();
                     state.gTable = spillableTableFactory.buildSpillableTable(
                             ctx, keyFields, comparatorFactories,
-                            firstNormalizerFactory, aggregateFactories,
+                            firstNormalizerFactory, aggregatorFactory,
                             recordDescProvider.getInputRecordDescriptor(
                                     getOperatorId(), 0), recordDescriptors[0],
                             ExternalGroupOperatorDescriptor.this.framesLimit);
@@ -273,14 +274,13 @@
                 comparators[i] = comparatorFactories[i]
                         .createBinaryComparator();
             }
-            final IFieldAggregateDescriptor[] currentWorkingAggregators = new IFieldAggregateDescriptor[mergeFactories.length];
-            final AggregateState[] aggregateStates = new AggregateState[mergeFactories.length];
-            for (int i = 0; i < currentWorkingAggregators.length; i++) {
-                currentWorkingAggregators[i] = mergeFactories[i]
-                        .createAggregator(ctx, recordDescriptors[0],
-                                recordDescriptors[0]);
-                aggregateStates[i] = currentWorkingAggregators[i].createState();
-            }
+
+            final IAggregatorDescriptor aggregator = mergerFactory
+                    .createAggregator(ctx, recordDescriptors[0],
+                            recordDescriptors[0], keyFields);
+            final AggregateState aggregateState = aggregator
+                    .createAggregateStates();
+
             final int[] storedKeys = new int[keyFields.length];
             /**
              * Get the list of the fields in the stored records.
@@ -288,11 +288,6 @@
             for (int i = 0; i < keyFields.length; ++i) {
                 storedKeys[i] = i;
             }
-            /**
-             * Tuple builder
-             */
-            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
-                    recordDescriptors[0].getFields().length);
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 /**
@@ -455,55 +450,28 @@
                                  * Initialize the first output record Reset the
                                  * tuple builder
                                  */
-                                tupleBuilder.reset();
-                                for (int i = 0; i < keyFields.length; i++) {
-                                    tupleBuilder.addField(fta, tupleIndex, i);
-                                }
-                                for (int i = 0; i < currentWorkingAggregators.length; i++) {
-                                    currentWorkingAggregators[i].init(fta,
-                                            tupleIndex,
-                                            tupleBuilder.getDataOutput(),
-                                            aggregateStates[i]);
-                                    tupleBuilder.addFieldEndOffset();
-                                }
-                                if (!outFrameAppender.append(
-                                        tupleBuilder.getFieldEndOffsets(),
-                                        tupleBuilder.getByteArray(), 0,
-                                        tupleBuilder.getSize())) {
+
+                                if (!aggregator.init(outFrameAppender, fta,
+                                        tupleIndex, aggregateState)) {
                                     flushOutFrame(writer, finalPass);
-                                    if (!outFrameAppender.append(
-                                            tupleBuilder.getFieldEndOffsets(),
-                                            tupleBuilder.getByteArray(), 0,
-                                            tupleBuilder.getSize()))
+                                    if (!aggregator.init(outFrameAppender, fta,
+                                            tupleIndex, aggregateState)) {
                                         throw new HyracksDataException(
                                                 "Failed to append an aggregation result to the output frame.");
+                                    }
                                 }
+
                             } else {
                                 /**
                                  * if new tuple is in the same group of the
                                  * current aggregator do merge and output to the
                                  * outFrame
                                  */
-                                int tupleOffset = outFrameAccessor
-                                        .getTupleStartOffset(currentTupleInOutFrame);
-                                int fieldOffset = outFrameAccessor
-                                        .getFieldStartOffset(
-                                                currentTupleInOutFrame,
-                                                keyFields.length);
-                                for (int i = 0; i < currentWorkingAggregators.length; i++) {
-                                    currentWorkingAggregators[i]
-                                            .aggregate(
-                                                    fta,
-                                                    tupleIndex,
-                                                    outFrameAccessor
-                                                            .getBuffer()
-                                                            .array(),
-                                                    tupleOffset
-                                                            + outFrameAccessor
-                                                                    .getFieldSlotsLength()
-                                                            + fieldOffset,
-                                                    aggregateStates[i]);
-                                }
+
+                                aggregator.aggregate(fta, tupleIndex,
+                                        outFrameAccessor,
+                                        currentTupleInOutFrame, aggregateState);
+
                             }
                             tupleIndices[runIndex]++;
                             setNextTopTuple(runIndex, tupleIndices,
@@ -514,9 +482,8 @@
                             flushOutFrame(writer, finalPass);
                         }
 
-                        for (int i = 0; i < currentWorkingAggregators.length; i++) {
-                            currentWorkingAggregators[i].close();
-                        }
+                        aggregator.close();
+                        
                         runs.subList(0, runNumber).clear();
                         /**
                          * insert the new run file into the beginning of the run
@@ -549,53 +516,30 @@
                     outFrameAccessor.reset(outFrame);
 
                     for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-                        int tupleOffset = outFrameAccessor
-                                .getTupleStartOffset(i);
-                        finalTupleBuilder.reset();
                         for (int j = 0; j < keyFields.length; j++) {
                             finalTupleBuilder.addField(outFrameAccessor, i, j);
                         }
-                        for (int j = 0; j < currentWorkingAggregators.length; j++) {
-                            int fieldOffset = outFrameAccessor
-                                    .getFieldStartOffset(i, keyFields.length
-                                            + j);
-                            if (isFinal)
-                                currentWorkingAggregators[j].outputFinalResult(
-                                        finalTupleBuilder.getDataOutput(),
-                                        outFrameAccessor.getBuffer().array(),
-                                        tupleOffset
-                                                + outFrameAccessor
-                                                        .getFieldSlotsLength()
-                                                + fieldOffset,
-                                        aggregateStates[j]);
-                            else
-                                currentWorkingAggregators[j]
-                                        .outputPartialResult(
-                                                finalTupleBuilder
-                                                        .getDataOutput(),
-                                                outFrameAccessor.getBuffer()
-                                                        .array(),
-                                                tupleOffset
-                                                        + outFrameAccessor
-                                                                .getFieldSlotsLength()
-                                                        + fieldOffset,
-                                                aggregateStates[j]);
-                            finalTupleBuilder.addFieldEndOffset();
+                        
+                        if(isFinal){
+                            if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                FrameUtils.flushFrame(writerFrame, writer);
+                                writerFrameAppender.reset(writerFrame, true);
+                                if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                    throw new HyracksDataException(
+                                            "Failed to write final aggregation result to a writer frame!");
+                                }
+                            }
+                        } else {
+                            if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                FrameUtils.flushFrame(writerFrame, writer);
+                                writerFrameAppender.reset(writerFrame, true);
+                                if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+                                    throw new HyracksDataException(
+                                            "Failed to write final aggregation result to a writer frame!");
+                                }
+                            }
                         }
-
-                        if (!writerFrameAppender.append(
-                                finalTupleBuilder.getFieldEndOffsets(),
-                                finalTupleBuilder.getByteArray(), 0,
-                                finalTupleBuilder.getSize())) {
-                            FrameUtils.flushFrame(writerFrame, writer);
-                            writerFrameAppender.reset(writerFrame, true);
-                            if (!writerFrameAppender.append(
-                                    finalTupleBuilder.getFieldEndOffsets(),
-                                    finalTupleBuilder.getByteArray(), 0,
-                                    finalTupleBuilder.getSize()))
-                                throw new HyracksDataException(
-                                        "Failed to write final aggregation result to a writer frame!");
-                        }
+                        aggregator.reset();
                     }
                     if (writerFrameAppender.getTupleCount() > 0) {
                         FrameUtils.flushFrame(writerFrame, writer);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index a61fd16..28a71ac 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -28,206 +28,186 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 
 class GroupingHashTable {
-	/**
-	 * The pointers in the link store 3 int values for each entry in the
-	 * hashtable: (bufferIdx, tIndex, accumulatorIdx).
-	 * 
-	 * @author vinayakb
-	 */
-	private static class Link {
-		private static final int INIT_POINTERS_SIZE = 9;
+    /**
+     * The pointers in the link store 3 int values for each entry in the
+     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+     * 
+     * @author vinayakb
+     */
+    private static class Link {
+        private static final int INIT_POINTERS_SIZE = 9;
 
-		int[] pointers;
-		int size;
+        int[] pointers;
+        int size;
 
-		Link() {
-			pointers = new int[INIT_POINTERS_SIZE];
-			size = 0;
-		}
+        Link() {
+            pointers = new int[INIT_POINTERS_SIZE];
+            size = 0;
+        }
 
-		void add(int bufferIdx, int tIndex, int accumulatorIdx) {
-			while (size + 3 > pointers.length) {
-				pointers = Arrays.copyOf(pointers, pointers.length * 2);
-			}
-			pointers[size++] = bufferIdx;
-			pointers[size++] = tIndex;
-			pointers[size++] = accumulatorIdx;
-		}
-	}
+        void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+            while (size + 3 > pointers.length) {
+                pointers = Arrays.copyOf(pointers, pointers.length * 2);
+            }
+            pointers[size++] = bufferIdx;
+            pointers[size++] = tIndex;
+            pointers[size++] = accumulatorIdx;
+        }
+    }
 
-	private static final int INIT_ACCUMULATORS_SIZE = 8;
-	private final IHyracksTaskContext ctx;
-	private final FrameTupleAppender appender;
-	private final List<ByteBuffer> buffers;
-	private final Link[] table;
-	private AggregateState[][] aggregateStates;
-	private int accumulatorSize;
+    private static final int INIT_AGG_STATE_SIZE = 8;
+    private final IHyracksTaskContext ctx;
+    private final FrameTupleAppender appender;
+    private final List<ByteBuffer> buffers;
+    private final Link[] table;
+    private AggregateState[] aggregateStates;
+    private int accumulatorSize;
 
-	private int lastBIndex;
-	private final int[] fields;
-	private final int[] storedKeys;
-	private final IBinaryComparator[] comparators;
-	private final FrameTuplePairComparator ftpc;
-	private final ITuplePartitionComputer tpc;
-	private final IFieldAggregateDescriptor[] aggregators;
-	private final RecordDescriptor inRecordDescriptor;
-	private final RecordDescriptor outRecordDescriptor;
+    private int lastBIndex;
+    private final int[] fields;
+    private final int[] storedKeys;
+    private final IBinaryComparator[] comparators;
+    private final FrameTuplePairComparator ftpc;
+    private final ITuplePartitionComputer tpc;
+    private final IAggregatorDescriptor aggregator;
 
-	private final FrameTupleAccessor storedKeysAccessor;
+    private final FrameTupleAccessor storedKeysAccessor;
 
-	GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			ITuplePartitionComputerFactory tpcf,
-			IFieldAggregateDescriptorFactory[] aggregatorFactories,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor, int tableSize)
-			throws HyracksDataException {
-		this.ctx = ctx;
-		appender = new FrameTupleAppender(ctx.getFrameSize());
-		buffers = new ArrayList<ByteBuffer>();
-		table = new Link[tableSize];
-		
-		this.aggregateStates = new AggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
-		accumulatorSize = 0;
-		
-		this.fields = fields;
-		storedKeys = new int[fields.length];
-		@SuppressWarnings("rawtypes")
-		ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-		for (int i = 0; i < fields.length; ++i) {
-			storedKeys[i] = i;
-			storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
-		}
-		
-		comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			comparators[i] = comparatorFactories[i].createBinaryComparator();
-		}
-		ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-		tpc = tpcf.createPartitioner();
-		
-		this.inRecordDescriptor = inRecordDescriptor;
-		this.outRecordDescriptor = outRecordDescriptor;
-		
-		this.aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
-		for (int i = 0; i < aggregatorFactories.length; i++) {
-			this.aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
-					this.inRecordDescriptor, this.outRecordDescriptor);
-		}
-		RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
-				storedKeySerDeser);
-		storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				storedKeysRecordDescriptor);
-		lastBIndex = -1;
-		addNewBuffer();
-	}
+    GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
+            IBinaryComparatorFactory[] comparatorFactories,
+            ITuplePartitionComputerFactory tpcf,
+            IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int tableSize)
+            throws HyracksDataException {
+        this.ctx = ctx;
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        buffers = new ArrayList<ByteBuffer>();
+        table = new Link[tableSize];
 
-	private void addNewBuffer() {
-		ByteBuffer buffer = ctx.allocateFrame();
-		buffer.position(0);
-		buffer.limit(buffer.capacity());
-		buffers.add(buffer);
-		appender.reset(buffer, true);
-		++lastBIndex;
-	}
+        this.fields = fields;
+        storedKeys = new int[fields.length];
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
+        for (int i = 0; i < fields.length; ++i) {
+            storedKeys[i] = i;
+            storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+        }
 
-	private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
-			throws HyracksDataException {
-		ByteBuffer frame = appender.getBuffer();
-		frame.position(0);
-		frame.limit(frame.capacity());
-		writer.nextFrame(appender.getBuffer());
-		appender.reset(appender.getBuffer(), true);
-	}
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
+        tpc = tpcf.createPartitioner();
 
-	void insert(FrameTupleAccessor accessor, int tIndex)
-			throws Exception {
-		int entry = tpc.partition(accessor, tIndex, table.length);
-		Link link = table[entry];
-		if (link == null) {
-			link = table[entry] = new Link();
-		}
-		int saIndex = -1;
-		for (int i = 0; i < link.size; i += 3) {
-			int sbIndex = link.pointers[i];
-			int stIndex = link.pointers[i + 1];
-			storedKeysAccessor.reset(buffers.get(sbIndex));
-			int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
-			if (c == 0) {
-				saIndex = link.pointers[i + 2];
-				break;
-			}
-		}
-		if (saIndex < 0) {
-			// Did not find the key. Insert a new entry.
-			saIndex = accumulatorSize++;
-			if (!appender.appendProjection(accessor, tIndex, fields)) {
-				addNewBuffer();
-				if (!appender.appendProjection(accessor, tIndex, fields)) {
-					throw new IllegalStateException();
-				}
-			}
-			int sbIndex = lastBIndex;
-			int stIndex = appender.getTupleCount() - 1;
-			for (int i = 0; i < aggregators.length; i++) {
-				AggregateState aggState = aggregators[i].createState();
-				aggregators[i].init(accessor, tIndex, null, aggState);
-				if (saIndex >= aggregateStates[i].length) {
-					aggregateStates[i] = Arrays.copyOf(aggregateStates[i],
-							aggregateStates[i].length * 2);
-				}
-				aggregateStates[i][saIndex] = aggState;
-			}
-			link.add(sbIndex, stIndex, saIndex);
-		} else {
-			for (int i = 0; i < aggregators.length; i++) {
-				aggregators[i].aggregate(accessor, tIndex, null, 0,
-						aggregateStates[i][saIndex]);
-			}
-		}
-	}
+        this.aggregator = aggregatorFactory.createAggregator(ctx,
+                inRecordDescriptor, outRecordDescriptor, fields);
 
-	void write(IFrameWriter writer) throws HyracksDataException {
-		ByteBuffer buffer = ctx.allocateFrame();
-		appender.reset(buffer, true);
-		ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
-				outRecordDescriptor.getFields().length);
-		for (int i = 0; i < table.length; ++i) {
-			Link link = table[i];
-			if (link != null) {
-				for (int j = 0; j < link.size; j += 3) {
-					int bIndex = link.pointers[j];
-					int tIndex = link.pointers[j + 1];
-					int aIndex = link.pointers[j + 2];
-					ByteBuffer keyBuffer = buffers.get(bIndex);
-					storedKeysAccessor.reset(keyBuffer);
+        this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
+        accumulatorSize = 0;
 
-					tupleBuilder.reset();
-					for (int k : this.fields) {
-						tupleBuilder.addField(storedKeysAccessor, tIndex, k);
-					}
-					for (int k = 0; k < aggregators.length; k++) {
-						aggregators[k].outputFinalResult(
-								tupleBuilder.getDataOutput(), null, 0,
-								aggregateStates[k][aIndex]);
-						tupleBuilder.addFieldEndOffset();
-					}
-					while (!appender.append(tupleBuilder.getFieldEndOffsets(),
-							tupleBuilder.getByteArray(), 0,
-							tupleBuilder.getSize())) {
-						flushFrame(appender, writer);
-					}
-				}
-			}
-		}
-		if (appender.getTupleCount() != 0) {
-			flushFrame(appender, writer);
-		}
-	}
+        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
+                storedKeySerDeser);
+        storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                storedKeysRecordDescriptor);
+        lastBIndex = -1;
+        addNewBuffer();
+    }
+
+    private void addNewBuffer() {
+        ByteBuffer buffer = ctx.allocateFrame();
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        buffers.add(buffer);
+        appender.reset(buffer, true);
+        ++lastBIndex;
+    }
+
+    private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
+            throws HyracksDataException {
+        ByteBuffer frame = appender.getBuffer();
+        frame.position(0);
+        frame.limit(frame.capacity());
+        writer.nextFrame(appender.getBuffer());
+        appender.reset(appender.getBuffer(), true);
+    }
+
+    void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
+        int entry = tpc.partition(accessor, tIndex, table.length);
+        Link link = table[entry];
+        if (link == null) {
+            link = table[entry] = new Link();
+        }
+        int saIndex = -1;
+        for (int i = 0; i < link.size; i += 3) {
+            int sbIndex = link.pointers[i];
+            int stIndex = link.pointers[i + 1];
+            storedKeysAccessor.reset(buffers.get(sbIndex));
+            int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
+            if (c == 0) {
+                saIndex = link.pointers[i + 2];
+                break;
+            }
+        }
+        if (saIndex < 0) {
+            // Did not find the key. Insert a new entry.
+            saIndex = accumulatorSize++;
+            // Add keys
+            if (!appender.appendProjection(accessor, tIndex, fields)) {
+                addNewBuffer();
+                if (!appender.appendProjection(accessor, tIndex, fields)) {
+                    throw new IllegalStateException();
+                }
+            }
+            // Add aggregation fields
+            int sbIndex = lastBIndex;
+            int stIndex = appender.getTupleCount() - 1;
+            AggregateState newState = aggregator.createAggregateStates();
+            aggregator.init(appender, accessor, tIndex, newState);
+
+            if (accumulatorSize >= aggregateStates.length) {
+                aggregateStates = Arrays.copyOf(aggregateStates,
+                        aggregateStates.length * 2);
+            }
+
+            aggregateStates[saIndex] = newState;
+
+            link.add(sbIndex, stIndex, saIndex);
+        } else {
+            aggregator.aggregate(accessor, tIndex, null, 0,
+                    aggregateStates[saIndex]);
+        }
+    }
+
+    void write(IFrameWriter writer) throws HyracksDataException {
+        ByteBuffer buffer = ctx.allocateFrame();
+        appender.reset(buffer, true);
+        for (int i = 0; i < table.length; ++i) {
+            Link link = table[i];
+            if (link != null) {
+                for (int j = 0; j < link.size; j += 3) {
+                    int bIndex = link.pointers[j];
+                    int tIndex = link.pointers[j + 1];
+                    int aIndex = link.pointers[j + 2];
+                    ByteBuffer keyBuffer = buffers.get(bIndex);
+                    storedKeysAccessor.reset(keyBuffer);
+
+                    while (!aggregator.outputFinalResult(appender,
+                            storedKeysAccessor, tIndex, aggregateStates[aIndex])) {
+                        flushFrame(appender, writer);
+                    }
+                    aggregator.reset();
+                }
+            }
+        }
+        if (appender.getTupleCount() != 0) {
+            flushFrame(appender, writer);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
index 1b46bdd..2596b91 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
@@ -53,20 +53,20 @@
     private final ITuplePartitionComputerFactory tpcf;
     private final IBinaryComparatorFactory[] comparatorFactories;
 
-    private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
 
     private final int tableSize;
 
     public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
             ITuplePartitionComputerFactory tpcf,
             IBinaryComparatorFactory[] comparatorFactories,
-            IFieldAggregateDescriptorFactory[] aggregatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor outRecordDescriptor, int tableSize) {
         super(spec, 1, 1);
         this.keys = keys;
         this.tpcf = tpcf;
         this.comparatorFactories = comparatorFactories;
-        this.aggregatorFactories = aggregatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
         recordDescriptors[0] = outRecordDescriptor;
         this.tableSize = tableSize;
     }
@@ -138,7 +138,7 @@
                     state = new HashBuildActivityState(ctx.getJobletContext()
                             .getJobId(), new TaskId(getActivityId(), partition));
                     state.table = new GroupingHashTable(ctx, keys,
-                            comparatorFactories, tpcf, aggregatorFactories,
+                            comparatorFactories, tpcf, aggregatorFactory,
                             recordDescProvider.getInputRecordDescriptor(
                                     getOperatorId(), 0), recordDescriptors[0],
                             tableSize);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index b3c36e6..1299a1e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -30,7 +30,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -72,7 +71,7 @@
             final int[] keyFields,
             IBinaryComparatorFactory[] comparatorFactories,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-            IFieldAggregateDescriptorFactory[] aggregatorFactories,
+            IAggregatorDescriptorFactory aggregateFactory,
             RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, final int framesLimit)
             throws HyracksDataException {
@@ -122,25 +121,15 @@
         final ITuplePartitionComputer tpc = tpcf.createPartitioner();
         final ByteBuffer outFrame = ctx.allocateFrame();
 
-        final ArrayTupleBuilder internalTupleBuilder;
-        if (keyFields.length < outRecordDescriptor.getFields().length)
-            internalTupleBuilder = new ArrayTupleBuilder(
-                    outRecordDescriptor.getFields().length);
-        else
-            internalTupleBuilder = new ArrayTupleBuilder(
-                    outRecordDescriptor.getFields().length + 1);
-        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
-                outRecordDescriptor.getFields().length);
         final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
                 : firstKeyNormalizerFactory.createNormalizedKeyComputer();
 
-        final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
-        final AggregateState[] aggregateStates = new AggregateState[aggregatorFactories.length];
-        for (int i = 0; i < aggregators.length; i++) {
-            aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
-                    inRecordDescriptor, outRecordDescriptor);
-            aggregateStates[i] = aggregators[i].createState();
-        }
+        final IAggregatorDescriptor aggregator = aggregateFactory
+                .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
+                        keyFields);
+
+        final AggregateState aggregateState = aggregator
+                .createAggregateStates();
 
         return new ISpillableTable() {
 
@@ -206,9 +195,7 @@
                 dataFrameCount = -1;
                 tPointers = null;
                 table.reset();
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].close();
-                }
+                aggregator.reset();
             }
 
             @Override
@@ -234,33 +221,15 @@
                 } while (true);
 
                 if (!foundGroup) {
-                    /**
-                     * If no matching group is found, create a new aggregator
-                     * Create a tuple for the new group
-                     */
-                    internalTupleBuilder.reset();
-                    for (int i = 0; i < keyFields.length; i++) {
-                        internalTupleBuilder.addField(accessor, tIndex,
-                                keyFields[i]);
-                    }
-                    for (int i = 0; i < aggregators.length; i++) {
-                        aggregators[i].init(accessor, tIndex,
-                                internalTupleBuilder.getDataOutput(),
-                                aggregateStates[i]);
-                        internalTupleBuilder.addFieldEndOffset();
-                    }
-                    if (!appender.append(
-                            internalTupleBuilder.getFieldEndOffsets(),
-                            internalTupleBuilder.getByteArray(), 0,
-                            internalTupleBuilder.getSize())) {
+
+                    if (!aggregator.init(appender, accessor, tIndex,
+                            aggregateState)) {
                         if (!nextAvailableFrame()) {
                             return false;
                         } else {
-                            if (!appender.append(
-                                    internalTupleBuilder.getFieldEndOffsets(),
-                                    internalTupleBuilder.getByteArray(), 0,
-                                    internalTupleBuilder.getSize())) {
-                                throw new IllegalStateException(
+                            if (!aggregator.init(appender, accessor, tIndex,
+                                    aggregateState)) {
+                                throw new HyracksDataException(
                                         "Failed to init an aggregator");
                             }
                         }
@@ -270,24 +239,10 @@
                     storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
                     table.insert(entry, storedTuplePointer);
                 } else {
-                    // If there is a matching found, do aggregation directly
-                    int tupleOffset = storedKeysAccessor1
-                            .getTupleStartOffset(storedTuplePointer.tupleIndex);
 
-                    for (int i = 0; i < aggregators.length; i++) {
-                        int aggFieldOffset = storedKeysAccessor1
-                                .getFieldStartOffset(
-                                        storedTuplePointer.tupleIndex,
-                                        keyFields.length + i);
-                        aggregators[i].aggregate(
-                                accessor,
-                                tIndex,
-                                storedKeysAccessor1.getBuffer().array(),
-                                tupleOffset
-                                        + storedKeysAccessor1
-                                                .getFieldSlotsLength()
-                                        + aggFieldOffset, aggregateStates[i]);
-                    }
+                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1,
+                            storedTuplePointer.tupleIndex, aggregateState);
+
                 }
                 return true;
             }
@@ -323,62 +278,29 @@
                             int tIndex = storedTuplePointer.tupleIndex;
 
                             storedKeysAccessor1.reset(frames.get(bIndex));
-                            int tupleOffset = storedKeysAccessor1
-                                    .getTupleStartOffset(tIndex);
-                            // Reset the tuple for the partial result
-                            outputTupleBuilder.reset();
-                            for (int k = 0; k < keyFields.length; k++) {
-                                outputTupleBuilder.addField(
-                                        storedKeysAccessor1, tIndex, k);
-                            }
-                            for (int k = 0; k < aggregators.length; k++) {
-                                int fieldStart = storedKeysAccessor1
-                                        .getFieldStartOffset(tIndex,
-                                                keyFields.length + k);
-                                if (isPartial)
-                                    aggregators[k]
-                                            .outputPartialResult(
-                                                    outputTupleBuilder
-                                                            .getDataOutput(),
-                                                    storedKeysAccessor1
-                                                            .getBuffer()
-                                                            .array(),
-                                                    tupleOffset
-                                                            + storedKeysAccessor1
-                                                                    .getFieldSlotsLength()
-                                                            + fieldStart,
-                                                    aggregateStates[k]);
-                                else
-                                    aggregators[k]
-                                            .outputFinalResult(
-                                                    outputTupleBuilder
-                                                            .getDataOutput(),
-                                                    storedKeysAccessor1
-                                                            .getBuffer()
-                                                            .array(),
-                                                    tupleOffset
-                                                            + storedKeysAccessor1
-                                                                    .getFieldSlotsLength()
-                                                            + fieldStart,
-                                                    aggregateStates[k]);
-                                outputTupleBuilder.addFieldEndOffset();
-                            }
 
-                            while (!appender.append(
-                                    outputTupleBuilder.getFieldEndOffsets(),
-                                    outputTupleBuilder.getByteArray(), 0,
-                                    outputTupleBuilder.getSize())) {
-                                FrameUtils.flushFrame(outFrame, writer);
-                                appender.reset(outFrame, true);
+                            if (isPartial) {
+                                while (!aggregator.outputPartialResult(
+                                        appender, storedKeysAccessor1, tIndex,
+                                        aggregateState)) {
+                                    FrameUtils.flushFrame(outFrame, writer);
+                                    appender.reset(outFrame, true);
+                                }
+                            } else {
+                                while (!aggregator.outputFinalResult(appender,
+                                        storedKeysAccessor1, tIndex,
+                                        aggregateState)) {
+                                    FrameUtils.flushFrame(outFrame, writer);
+                                    appender.reset(outFrame, true);
+                                }
                             }
+                            aggregator.reset();
                         } while (true);
                     }
                     if (appender.getTupleCount() != 0) {
                         FrameUtils.flushFrame(outFrame, writer);
                     }
-                    for (int i = 0; i < aggregators.length; i++) {
-                        aggregators[i].close();
-                    }
+                    aggregator.close();
                     return;
                 }
                 int n = tPointers.length / 3;
@@ -393,54 +315,41 @@
                     ByteBuffer buffer = frames.get(frameIndex);
                     storedKeysAccessor1.reset(buffer);
 
-                    int tupleOffset = storedKeysAccessor1
-                            .getTupleStartOffset(tupleIndex);
-
-                    outputTupleBuilder.reset();
-                    for (int k = 0; k < keyFields.length; k++) {
-                        outputTupleBuilder.addField(storedKeysAccessor1,
-                                tupleIndex, k);
-                    }
-                    for (int k = 0; k < aggregators.length; k++) {
-                        int fieldStart = storedKeysAccessor1
-                                .getFieldStartOffset(tupleIndex,
-                                        keyFields.length + k);
-                        if (isPartial)
-                            aggregators[k].outputPartialResult(
-                                    outputTupleBuilder.getDataOutput(),
-                                    storedKeysAccessor1.getBuffer().array(),
-                                    tupleOffset
-                                            + storedKeysAccessor1
-                                                    .getFieldSlotsLength()
-                                            + fieldStart, aggregateStates[k]);
-                        else
-                            aggregators[k].outputFinalResult(outputTupleBuilder
-                                    .getDataOutput(), storedKeysAccessor1
-                                    .getBuffer().array(), tupleOffset
-                                    + storedKeysAccessor1.getFieldSlotsLength()
-                                    + fieldStart, aggregateStates[k]);
-                        outputTupleBuilder.addFieldEndOffset();
-                    }
-                    if (!appender.append(
-                            outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0,
-                            outputTupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outFrame, writer);
-                        appender.reset(outFrame, true);
-                        if (!appender.append(
-                                outputTupleBuilder.getFieldEndOffsets(),
-                                outputTupleBuilder.getByteArray(), 0,
-                                outputTupleBuilder.getSize())) {
-                            throw new IllegalStateException();
+                    if (isPartial) {
+                        if (!aggregator
+                                .outputPartialResult(appender,
+                                        storedKeysAccessor1, tupleIndex,
+                                        aggregateState)) {
+                            FrameUtils.flushFrame(outFrame, writer);
+                            appender.reset(outFrame, true);
+                            if (!aggregator.outputPartialResult(appender,
+                                    storedKeysAccessor1, tupleIndex,
+                                    aggregateState)) {
+                                throw new HyracksDataException(
+                                        "Failed to output partial result.");
+                            }
+                        }
+                    } else {
+                        if (!aggregator
+                                .outputFinalResult(appender,
+                                        storedKeysAccessor1, tupleIndex,
+                                        aggregateState)) {
+                            FrameUtils.flushFrame(outFrame, writer);
+                            appender.reset(outFrame, true);
+                            if (!aggregator.outputFinalResult(appender,
+                                    storedKeysAccessor1, tupleIndex,
+                                    aggregateState)) {
+                                throw new HyracksDataException(
+                                        "Failed to output partial result.");
+                            }
                         }
                     }
+                    aggregator.reset();
                 }
                 if (appender.getTupleCount() > 0) {
                     FrameUtils.flushFrame(outFrame, writer);
                 }
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].close();
-                }
+                aggregator.close();
             }
 
             @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
new file mode 100644
index 0000000..75c546f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface IAggregateStateFactory extends Serializable {
+
+    public int getStateLength();
+    
+    public Object createState();
+    
+    public boolean hasBinaryState();
+    
+    public boolean hasObjectState();
+    
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
new file mode 100644
index 0000000..176806f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ *
+ */
+public interface IAggregatorDescriptor {
+
+    /**
+     * Create an aggregate state
+     * 
+     * @return
+     */
+    public AggregateState createAggregateStates();
+    
+    public int getAggregateStatesLength();
+
+    /**
+     * Initialize the state based on the input tuple.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param fieldOutput
+     *            The data output for the frame containing the state. This may
+     *            be null, if the state is maintained as a java object
+     * @param state
+     *            The state to be initialized.
+     * @throws HyracksDataException
+     */
+    public boolean init(FrameTupleAppender appender,
+            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+            throws HyracksDataException;
+
+    /**
+     * Reset the aggregator. The corresponding aggregate state should be reset
+     * too. Note that here the frame is not an input argument, since it can be
+     * reset outside of the aggregator (simply reset the starting index of the
+     * buffer).
+     * 
+     * @param state
+     */
+    public void reset();
+
+    /**
+     * Aggregate the value. Aggregate state should be updated correspondingly.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param data
+     *            The buffer containing the state, if frame-based-state is used.
+     *            This means that it can be null if java-object-based-state is
+     *            used.
+     * @param offset
+     * @param state
+     *            The aggregate state.
+     * @throws HyracksDataException
+     */
+    public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+            IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+            AggregateState state) throws HyracksDataException;
+
+    /**
+     * Output the partial aggregation result.
+     * 
+     * @param fieldOutput
+     *            The data output for the output frame
+     * @param data
+     *            The buffer containing the aggregation state
+     * @param offset
+     * @param state
+     *            The aggregation state.
+     * @throws HyracksDataException
+     */
+    public boolean outputPartialResult(FrameTupleAppender appender,
+            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+            throws HyracksDataException;
+
+    /**
+     * Output the final aggregation result.
+     * 
+     * @param fieldOutput
+     *            The data output for the output frame
+     * @param data
+     *            The buffer containing the aggregation state
+     * @param offset
+     * @param state
+     *            The aggregation state.
+     * @throws HyracksDataException
+     */
+    public boolean outputFinalResult(FrameTupleAppender appender,
+            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+            throws HyracksDataException;
+
+    public void close();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..65b3873
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ *
+ */
+public interface IAggregatorDescriptorFactory extends Serializable {
+
+    IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
+    
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
index 9bceea3..50da4cb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -23,13 +23,8 @@
  *
  */
 public interface IFieldAggregateDescriptor {
-
-    /**
-     * Create an aggregate state
-     * 
-     * @return
-     */
-    public AggregateState createState();
+    
+    public IAggregateStateFactory getAggregateStateFactory();
 
     /**
      * Initialize the state based on the input tuple.
@@ -46,6 +41,22 @@
     public void init(IFrameTupleAccessor accessor, int tIndex,
             DataOutput fieldOutput, AggregateState state)
             throws HyracksDataException;
+    
+    /**
+     * Initialize the state based on the input tuple.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param fieldOutput
+     *            The data output for the frame containing the state. This may
+     *            be null, if the state is maintained as a java object
+     * @param state
+     *            The state to be initialized.
+     * @throws HyracksDataException
+     */
+    public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
+            DataOutput fieldOutput, AggregateState state)
+            throws HyracksDataException;
 
     /**
      * Reset the aggregator. The corresponding aggregate state should be reset
@@ -55,7 +66,7 @@
      * 
      * @param state
      */
-    public void reset(AggregateState state);
+    public void reset();
 
     /**
      * Aggregate the value. Aggregate state should be updated correspondingly.
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
index 6420766..bae041d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
@@ -25,7 +25,7 @@
 public interface ISpillableTableFactory extends Serializable {
     ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
             IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
-            IFieldAggregateDescriptorFactory[] aggregatorFactories, RecordDescriptor inRecordDescriptor,
+            IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
similarity index 69%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
index 401fc51..3ecb968 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
@@ -24,20 +24,24 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
 /**
  *
  */
-public class AvgAggregatorFactory implements IFieldAggregateDescriptorFactory {
+public class AvgFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
     
     private static final long serialVersionUID = 1L;
     
     private final int aggField;
     
-    public AvgAggregatorFactory(int aggField){
+    private final boolean useObjectState;
+    
+    public AvgFieldAggregatorFactory(int aggField, boolean useObjectState){
         this.aggField = aggField;
+        this.useObjectState = useObjectState;
     }
     
     /* (non-Javadoc)
@@ -51,15 +55,14 @@
         return new IFieldAggregateDescriptor() {
             
             @Override
-            public void reset(AggregateState state) {
-                state.reset();
+            public void reset() {
             }
             
             @Override
             public void outputPartialResult(DataOutput fieldOutput, byte[] data,
                     int offset, AggregateState state) throws HyracksDataException {
                 int sum, count;
-                if (data != null) {
+                if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                     count = IntegerSerializerDeserializer.getInt(data, offset + 4);
                 } else {
@@ -80,7 +83,7 @@
             public void outputFinalResult(DataOutput fieldOutput, byte[] data,
                     int offset, AggregateState state) throws HyracksDataException {
                 int sum, count;
-                if (data != null) {
+                if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                     count = IntegerSerializerDeserializer.getInt(data, offset + 4);
                 } else {
@@ -109,7 +112,7 @@
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
                 count += 1;
-                if (fieldOutput != null) {
+                if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(sum);
                         fieldOutput.writeInt(count);
@@ -123,11 +126,6 @@
             }
             
             @Override
-            public AggregateState createState() {
-                return new AggregateState();
-            }
-            
-            @Override
             public void close() {
                 // TODO Auto-generated method stub
                 
@@ -145,7 +143,7 @@
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
                 count += 1;
-                if (data != null) {
+                if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
                     sum += buf.getInt(offset);
                     count += buf.getInt(offset + 4);
@@ -158,6 +156,63 @@
                     state.setState(new Integer[]{sum, count});
                 }
             }
+
+            @Override
+            public IAggregateStateFactory getAggregateStateFactory() {
+                return new IAggregateStateFactory() {
+                    
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public boolean hasObjectState() {
+                        return useObjectState;
+                    }
+                    
+                    @Override
+                    public boolean hasBinaryState() {
+                        return !useObjectState;
+                    }
+                    
+                    @Override
+                    public int getStateLength() {
+                        return 8;
+                    }
+                    
+                    @Override
+                    public Object createState() {
+                        return new Integer[]{0, 0};
+                    }
+                };
+            }
+
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor,
+                    int tIndex, DataOutput fieldOutput, AggregateState state)
+                    throws HyracksDataException {
+                int sum = 0;
+                int count = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart);
+                count += IntegerSerializerDeserializer.getInt(accessor
+                        .getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength()
+                                + fieldStart + 4);
+                if (!useObjectState) {
+                    try {
+                        fieldOutput.writeInt(sum);
+                        fieldOutput.writeInt(count);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(
+                                "I/O exception when initializing the aggregator.");
+                    }
+                } else {
+                    state.setState(new Integer[]{sum, count});
+                }
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
deleted file mode 100644
index df71f47..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-
-/**
- *
- */
-public class AvgMergeAggregatorFactory implements
-        IFieldAggregateDescriptorFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int aggField;
-    
-    public AvgMergeAggregatorFactory(int aggField){
-        this.aggField = aggField;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
-     * IFieldAggregateDescriptorFactory
-     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
-     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
-     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
-     */
-    @Override
-    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-        return new IFieldAggregateDescriptor() {
-            
-            @Override
-            public void reset(AggregateState state) {
-                state.reset();
-            }
-            
-            @Override
-            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
-                int sum, count;
-                if (data != null) {
-                    sum = IntegerSerializerDeserializer.getInt(data, offset);
-                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
-                } else {
-                    Integer[] fields = (Integer[])state.getState();
-                    sum = fields[0];
-                    count = fields[1];
-                }
-                try {
-                    fieldOutput.writeInt(sum);
-                    fieldOutput.writeInt(count);
-                } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
-                }
-            }
-            
-            @Override
-            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
-                int sum, count;
-                if (data != null) {
-                    sum = IntegerSerializerDeserializer.getInt(data, offset);
-                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
-                } else {
-                    Integer[] fields = (Integer[])state.getState();
-                    sum = fields[0];
-                    count = fields[1];
-                }
-                try {
-                    fieldOutput.writeFloat((float)sum/count);
-                } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
-                }
-            }
-            
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, AggregateState state)
-                    throws HyracksDataException {
-                int sum = 0;
-                int count = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                count += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart + 4);
-                if (fieldOutput != null) {
-                    try {
-                        fieldOutput.writeInt(sum);
-                        fieldOutput.writeInt(count);
-                    } catch (IOException e) {
-                        throw new HyracksDataException(
-                                "I/O exception when initializing the aggregator.");
-                    }
-                } else {
-                    state.setState(new Object[]{sum, count});
-                }
-            }
-            
-            @Override
-            public AggregateState createState() {
-                return new AggregateState();
-            }
-            
-            @Override
-            public void close() {
-                // TODO Auto-generated method stub
-                
-            }
-            
-            @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, AggregateState state)
-                    throws HyracksDataException {
-                int sum = 0, count = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                count += 1;
-                if (data != null) {
-                    ByteBuffer buf = ByteBuffer.wrap(data);
-                    sum += buf.getInt(offset);
-                    count += buf.getInt(offset + 4);
-                    buf.putInt(offset, sum);
-                    buf.putInt(offset + 4, count);
-                } else {
-                    Integer[] fields = (Integer[])state.getState();
-                    sum += fields[0];
-                    count += fields[1];
-                    state.setState(new Object[]{sum, count});
-                }
-            }
-        };
-    }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
similarity index 61%
copy from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
copy to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
index 401fc51..a56c669 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
@@ -24,22 +24,24 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
 /**
  *
  */
-public class AvgAggregatorFactory implements IFieldAggregateDescriptorFactory {
-    
+public class CountFieldAggregatorFactory implements
+        IFieldAggregateDescriptorFactory {
+
     private static final long serialVersionUID = 1L;
     
-    private final int aggField;
+    private final boolean useObjectState;
     
-    public AvgAggregatorFactory(int aggField){
-        this.aggField = aggField;
+    public CountFieldAggregatorFactory(boolean useObjectState){
+        this.useObjectState = useObjectState;
     }
-    
+
     /* (non-Javadoc)
      * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
@@ -47,28 +49,22 @@
     public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
             RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-        
         return new IFieldAggregateDescriptor() {
             
             @Override
-            public void reset(AggregateState state) {
-                state.reset();
+            public void reset() {
             }
             
             @Override
             public void outputPartialResult(DataOutput fieldOutput, byte[] data,
                     int offset, AggregateState state) throws HyracksDataException {
-                int sum, count;
-                if (data != null) {
-                    sum = IntegerSerializerDeserializer.getInt(data, offset);
-                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+                int count;
+                if (!useObjectState) {
+                    count = IntegerSerializerDeserializer.getInt(data, offset);
                 } else {
-                    Integer[] fields = (Integer[])state.getState();
-                    sum = fields[0];
-                    count = fields[1];
+                    count = (Integer) state.getState();
                 }
                 try {
-                    fieldOutput.writeInt(sum);
                     fieldOutput.writeInt(count);
                 } catch (IOException e) {
                     throw new HyracksDataException(
@@ -79,17 +75,14 @@
             @Override
             public void outputFinalResult(DataOutput fieldOutput, byte[] data,
                     int offset, AggregateState state) throws HyracksDataException {
-                int sum, count;
-                if (data != null) {
-                    sum = IntegerSerializerDeserializer.getInt(data, offset);
-                    count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+                int count;
+                if (!useObjectState) {
+                    count = IntegerSerializerDeserializer.getInt(data, offset);
                 } else {
-                    Integer[] fields = (Integer[])state.getState();
-                    sum = fields[0];
-                    count = fields[1];
+                    count = (Integer) state.getState();
                 }
                 try {
-                    fieldOutput.writeFloat((float)sum/count);
+                    fieldOutput.writeInt(count);
                 } catch (IOException e) {
                     throw new HyracksDataException(
                             "I/O exception when writing aggregation to the output buffer.");
@@ -100,36 +93,49 @@
             public void init(IFrameTupleAccessor accessor, int tIndex,
                     DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
-                int sum = 0;
-                int count = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                count += 1;
-                if (fieldOutput != null) {
+                int count = 1;
+                if (!useObjectState) {
                     try {
-                        fieldOutput.writeInt(sum);
                         fieldOutput.writeInt(count);
                     } catch (IOException e) {
                         throw new HyracksDataException(
                                 "I/O exception when initializing the aggregator.");
                     }
                 } else {
-                    state.setState(new Integer[]{sum, count});
+                    state.setState(count);
                 }
             }
             
             @Override
-            public AggregateState createState() {
-                return new AggregateState();
+            public IAggregateStateFactory getAggregateStateFactory() {
+                return new IAggregateStateFactory() {
+                    
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public boolean hasObjectState() {
+                        return useObjectState;
+                    }
+                    
+                    @Override
+                    public boolean hasBinaryState() {
+                        return !useObjectState;
+                    }
+                    
+                    @Override
+                    public int getStateLength() {
+                        return 4;
+                    }
+                    
+                    @Override
+                    public Object createState() {
+                        return new Integer(0);
+                    }
+                };
             }
             
             @Override
             public void close() {
-                // TODO Auto-generated method stub
                 
             }
             
@@ -137,27 +143,23 @@
             public void aggregate(IFrameTupleAccessor accessor, int tIndex,
                     byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
-                int sum = 0, count = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                count += 1;
-                if (data != null) {
+                int count = 1;
+                if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
-                    sum += buf.getInt(offset);
-                    count += buf.getInt(offset + 4);
-                    buf.putInt(offset, sum);
-                    buf.putInt(offset + 4, count);
+                    count += buf.getInt(offset);
+                    buf.putInt(offset, count);
                 } else {
-                    Integer[] fields = (Integer[])state.getState();
-                    sum += fields[0];
-                    count += fields[1];
-                    state.setState(new Integer[]{sum, count});
+                    count += (Integer) state.getState();
+                    state.setState(count);
                 }
             }
+
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor,
+                    int tIndex, DataOutput fieldOutput, AggregateState state)
+                    throws HyracksDataException {
+                init(accessor, tIndex, fieldOutput, state);
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
similarity index 73%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
index d3b52f3..7db12f7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
@@ -24,21 +24,25 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
 /**
  *
  */
-public class IntSumAggregatorFactory implements
+public class IntSumFieldAggregatorFactory implements
         IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
 
     private final int aggField;
+    
+    private final boolean useObjectState;
 
-    public IntSumAggregatorFactory(int aggField) {
+    public IntSumFieldAggregatorFactory(int aggField, boolean useObjState) {
         this.aggField = aggField;
+        this.useObjectState = useObjState;
     }
 
     /*
@@ -58,8 +62,7 @@
         return new IFieldAggregateDescriptor() {
 
             @Override
-            public void reset(AggregateState state) {
-                state.reset();
+            public void reset() {
             }
 
             @Override
@@ -67,10 +70,10 @@
                     byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum;
-                if (data != null) {
+                if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                 } else {
-                    sum = (Integer) (state.getState());
+                    sum = (Integer) state.getState();
                 }
                 try {
                     fieldOutput.writeInt(sum);
@@ -85,10 +88,10 @@
                     int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum;
-                if (data != null) {
+                if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                 } else {
-                    sum = (Integer) (state.getState());
+                    sum = (Integer) state.getState();
                 }
                 try {
                     fieldOutput.writeInt(sum);
@@ -109,7 +112,7 @@
                         .getBuffer().array(),
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
-                if (fieldOutput != null) {
+                if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(sum);
                     } catch (IOException e) {
@@ -117,13 +120,36 @@
                                 "I/O exception when initializing the aggregator.");
                     }
                 } else {
-                    state.setState(new Integer(sum));
+                    state.setState(sum);
                 }
             }
 
             @Override
-            public AggregateState createState() {
-                return new AggregateState();
+            public IAggregateStateFactory getAggregateStateFactory(){
+                return new IAggregateStateFactory() {
+                    
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public boolean hasObjectState() {
+                        return useObjectState;
+                    }
+                    
+                    @Override
+                    public boolean hasBinaryState() {
+                        return !useObjectState;
+                    }
+                    
+                    @Override
+                    public int getStateLength() {
+                        return 4;
+                    }
+                    
+                    @Override
+                    public Object createState() {
+                        return new Integer(0);
+                    }
+                };
             }
 
             @Override
@@ -142,15 +168,22 @@
                         .getBuffer().array(),
                         tupleOffset + accessor.getFieldSlotsLength()
                                 + fieldStart);
-                if (data != null) {
+                if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
                     sum += buf.getInt(offset);
                     buf.putInt(offset, sum);
                 } else {
-                    sum += (Integer) (state.getState());
-                    state.setState(new Integer(sum));
+                    sum += (Integer) state.getState();
+                    state.setState(sum);
                 }
             }
+
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor,
+                    int tIndex, DataOutput fieldOutput, AggregateState state)
+                    throws HyracksDataException {
+                init(accessor, tIndex, fieldOutput, state);
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
similarity index 81%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
index f53108a..b76c285 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -27,13 +27,14 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
 
 /**
  *
  */
-public class MinMaxStringAggregatorFactory implements
+public class MinMaxStringFieldAggregatorFactory implements
         IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
@@ -41,10 +42,13 @@
     private final int aggField;
 
     private final boolean isMax;
+    
+    private final boolean hasBinaryState;
 
-    public MinMaxStringAggregatorFactory(int aggField, boolean isMax) {
+    public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax, boolean hasBinaryState) {
         this.aggField = aggField;
         this.isMax = isMax;
+        this.hasBinaryState = hasBinaryState;
     }
 
     /*
@@ -63,8 +67,7 @@
         return new IFieldAggregateDescriptor() {
 
             @Override
-            public void reset(AggregateState state) {
-                state.reset();
+            public void reset() {
             }
 
             @Override
@@ -72,7 +75,7 @@
                     byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 try {
-                    if (data != null) {
+                    if (hasBinaryState) {
                         int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
                         Object[] storedState = (Object[]) state.getState();
                         fieldOutput.writeUTF((String)storedState[stateIdx]);
@@ -90,11 +93,14 @@
                     int offset, AggregateState state)
                     throws HyracksDataException {
                 try {
-                    if (data != null) {
+                    if (hasBinaryState) {
                         int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
                         Object[] storedState = (Object[]) state.getState();
                         fieldOutput.writeUTF((String)storedState[stateIdx]);
                     } else {
+                        if(((String)state.getState()).equalsIgnoreCase("ic platelets lose carefully. blithely unu")){
+                            System.out.print("");
+                        }
                         fieldOutput.writeUTF((String) state.getState());
                     }
                 } catch (IOException e) {
@@ -116,7 +122,7 @@
                                         .array(), tupleOffset
                                         + accessor.getFieldSlotsLength()
                                         + fieldStart, fieldLength)));
-                if (fieldOutput != null) {
+                if (hasBinaryState) {
                     // Object-binary-state
                     Object[] storedState = (Object[]) state.getState();
                     if (storedState == null) {
@@ -146,11 +152,6 @@
             }
 
             @Override
-            public AggregateState createState() {
-                return new AggregateState();
-            }
-
-            @Override
             public void close() {
                 // TODO Auto-generated method stub
 
@@ -169,7 +170,7 @@
                                         .array(), tupleOffset
                                         + accessor.getFieldSlotsLength()
                                         + fieldStart, fieldLength)));
-                if (data != null) {
+                if (hasBinaryState) {
                     int stateIdx = IntegerSerializerDeserializer.getInt(data,
                             offset);
                     Object[] storedState = (Object[]) state.getState();
@@ -199,6 +200,41 @@
                     }
                 }
             }
+
+            @Override
+            public IAggregateStateFactory getAggregateStateFactory() {
+                return new IAggregateStateFactory() {
+                    
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public boolean hasObjectState() {
+                        return true;
+                    }
+                    
+                    @Override
+                    public boolean hasBinaryState() {
+                        return hasBinaryState;
+                    }
+                    
+                    @Override
+                    public int getStateLength() {
+                        return 4;
+                    }
+                    
+                    @Override
+                    public Object createState() {
+                        return null;
+                    }
+                };
+            }
+
+            @Override
+            public void initFromPartial(IFrameTupleAccessor accessor,
+                    int tIndex, DataOutput fieldOutput, AggregateState state)
+                    throws HyracksDataException {
+                init(accessor, tIndex, fieldOutput, state);
+            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
new file mode 100644
index 0000000..7fe636a
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+
+/**
+ *
+ */
+public class MultiFieldsAggregatorFactory implements
+        IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+
+    public MultiFieldsAggregatorFactory(
+            IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+        this.aggregatorFactories = aggregatorFactories;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptorFactory
+     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, final int[] keyFields)
+            throws HyracksDataException {
+
+        final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
+        final IAggregateStateFactory[] aggregateStateFactories = new IAggregateStateFactory[aggregators.length];
+        for (int i = 0; i < aggregators.length; i++) {
+            aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
+                    inRecordDescriptor, outRecordDescriptor);
+            aggregateStateFactories[i] = aggregators[i]
+                    .getAggregateStateFactory();
+        }
+
+        int stateTupleFieldCount = keyFields.length;
+        for (int i = 0; i < aggregateStateFactories.length; i++) {
+            if (aggregateStateFactories[i].hasBinaryState()) {
+                stateTupleFieldCount++;
+            }
+        }
+
+        final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(
+                stateTupleFieldCount);
+
+        final ArrayTupleBuilder resultTupleBuilder = new ArrayTupleBuilder(
+                outRecordDescriptor.getFields().length);
+
+        return new IAggregatorDescriptor() {
+
+            private boolean pending;
+
+            @Override
+            public void reset() {
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].reset();
+                    aggregateStateFactories[i] = aggregators[i]
+                            .getAggregateStateFactory();
+                }
+                pending = false;
+            }
+
+            @Override
+            public boolean outputPartialResult(FrameTupleAppender appender,
+                    IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                if (!pending) {
+                    resultTupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        resultTupleBuilder.addField(accessor, tIndex,
+                                keyFields[i]);
+                    }
+                    DataOutput dos = resultTupleBuilder.getDataOutput();
+
+                    int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                    for (int i = 0; i < aggregators.length; i++) {
+                        int fieldOffset = accessor.getFieldStartOffset(tIndex,
+                                keyFields.length + i);
+                        aggregators[i].outputPartialResult(dos, accessor
+                                .getBuffer().array(),
+                                fieldOffset + accessor.getFieldSlotsLength()
+                                        + tupleOffset,
+                                ((AggregateState[]) state.getState())[i]);
+                        resultTupleBuilder.addFieldEndOffset();
+                    }
+                }
+                if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
+                        resultTupleBuilder.getByteArray(), 0,
+                        resultTupleBuilder.getSize())) {
+                    pending = true;
+                    return false;
+                }
+                return true;
+
+            }
+
+            @Override
+            public boolean outputFinalResult(FrameTupleAppender appender,
+                    IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                if (!pending) {
+                    resultTupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        resultTupleBuilder.addField(accessor, tIndex,
+                                keyFields[i]);
+                    }
+                    DataOutput dos = resultTupleBuilder.getDataOutput();
+
+                    int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                    for (int i = 0; i < aggregators.length; i++) {
+                        if (aggregateStateFactories[i].hasBinaryState()) {
+                            int fieldOffset = accessor.getFieldStartOffset(
+                                    tIndex, keyFields.length + i);
+                            aggregators[i].outputFinalResult(dos, accessor
+                                    .getBuffer().array(), tupleOffset
+                                    + accessor.getFieldSlotsLength()
+                                    + fieldOffset, ((AggregateState[]) state
+                                    .getState())[i]);
+                        } else {
+                            aggregators[i].outputFinalResult(dos, null, 0,
+                                    ((AggregateState[]) state.getState())[i]);
+                        }
+                        resultTupleBuilder.addFieldEndOffset();
+                    }
+                }
+                if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
+                        resultTupleBuilder.getByteArray(), 0,
+                        resultTupleBuilder.getSize())) {
+                    pending = true;
+                    return false;
+                }
+                return true;
+            }
+
+            @Override
+            public boolean init(FrameTupleAppender appender,
+                    IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                if (!pending) {
+                    stateTupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        stateTupleBuilder.addField(accessor, tIndex,
+                                keyFields[i]);
+                    }
+                    DataOutput dos = stateTupleBuilder.getDataOutput();
+
+                    for (int i = 0; i < aggregators.length; i++) {
+                        aggregators[i].init(accessor, tIndex, dos,
+                                ((AggregateState[]) state.getState())[i]);
+                        if (aggregateStateFactories[i].hasBinaryState()) {
+                            stateTupleBuilder.addFieldEndOffset();
+                        }
+                    }
+                }
+                if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
+                        stateTupleBuilder.getByteArray(), 0,
+                        stateTupleBuilder.getSize())) {
+                    pending = true;
+                    return false;
+                }
+                return true;
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                AggregateState aggregateStates = new AggregateState();
+                AggregateState[] states = new AggregateState[aggregateStateFactories.length];
+                for (int i = 0; i < states.length; i++) {
+                    states[i] = new AggregateState();
+                    states[i]
+                            .setState(aggregateStateFactories[i].createState());
+                }
+                aggregateStates.setState(states);
+                return aggregateStates;
+            }
+
+            @Override
+            public int getAggregateStatesLength() {
+                int stateLength = 0;
+                for (int i = 0; i < aggregateStateFactories.length; i++) {
+                    stateLength += aggregateStateFactories[i].getStateLength();
+                }
+                return stateLength;
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+                    IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+                    AggregateState state) throws HyracksDataException {
+                if (stateAccessor != null) {
+                    int stateTupleOffset = stateAccessor
+                            .getTupleStartOffset(stateTupleIndex);
+                    int fieldIndex = 0;
+                    for (int i = 0; i < aggregators.length; i++) {
+                        if (aggregateStateFactories[i].hasBinaryState()) {
+                            int stateFieldOffset = stateAccessor
+                                    .getFieldStartOffset(stateTupleIndex,
+                                            keyFields.length + fieldIndex);
+                            aggregators[i].aggregate(
+                                    accessor,
+                                    tIndex,
+                                    stateAccessor.getBuffer().array(),
+                                    stateTupleOffset
+                                            + stateAccessor
+                                                    .getFieldSlotsLength()
+                                            + stateFieldOffset,
+                                    ((AggregateState[]) state.getState())[i]);
+                            fieldIndex++;
+                        } else {
+                            aggregators[i].aggregate(accessor, tIndex, null, 0,
+                                    ((AggregateState[]) state.getState())[i]);
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < aggregators.length; i++) {
+                        aggregators[i].aggregate(accessor, tIndex, null, 0,
+                                ((AggregateState[]) state.getState())[i]);
+                    }
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 1e6766a..45468e7 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -42,10 +42,11 @@
 import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgMergeAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MinMaxStringAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MinMaxStringFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -140,9 +141,11 @@
                         keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new IntSumAggregatorFactory(3) }, outputRec, tableSize);
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
+                outputRec, tableSize);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
                 NC2_ID, NC1_ID);
@@ -193,12 +196,12 @@
                 frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new IntSumAggregatorFactory(3) },
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new IntSumAggregatorFactory(3) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new IntSumFieldAggregatorFactory(3, false) }),
+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new IntSumFieldAggregatorFactory(3, false) }),
                 outputRec,
                 new HashSpillableTableFactory(
                         new FieldHashPartitionComputerFactory(
@@ -243,6 +246,7 @@
                 new ISerializerDeserializer[] {
                         UTF8StringSerializerDeserializer.INSTANCE,
                         IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
                         FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
@@ -255,9 +259,10 @@
                         keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new AvgAggregatorFactory(1) }, outputRec, tableSize);
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new CountFieldAggregatorFactory(true),
+                        new AvgFieldAggregatorFactory(1, true) }), outputRec, tableSize);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
                 NC2_ID, NC1_ID);
@@ -296,6 +301,7 @@
                 new ISerializerDeserializer[] {
                         UTF8StringSerializerDeserializer.INSTANCE,
                         IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
                         FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
@@ -308,12 +314,14 @@
                 frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new AvgAggregatorFactory(1) },
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new AvgMergeAggregatorFactory(2) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new CountFieldAggregatorFactory(false),
+                        new AvgFieldAggregatorFactory(1, false) }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new CountFieldAggregatorFactory(false),
+                        new AvgFieldAggregatorFactory(2, false) }),
                 outputRec,
                 new HashSpillableTableFactory(
                         new FieldHashPartitionComputerFactory(
@@ -370,9 +378,10 @@
                         keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new MinMaxStringAggregatorFactory(15, true) }, outputRec, tableSize);
+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }),
+                outputRec, tableSize);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
                 NC2_ID, NC1_ID);
@@ -423,12 +432,12 @@
                 frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new MinMaxStringAggregatorFactory(15, true) },
-                new IFieldAggregateDescriptorFactory[] {
-                        new IntSumAggregatorFactory(1),
-                        new MinMaxStringAggregatorFactory(2, true) },
+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new MinMaxStringFieldAggregatorFactory(2, true, true) }),
                 outputRec,
                 new HashSpillableTableFactory(
                         new FieldHashPartitionComputerFactory(
@@ -458,5 +467,5 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
 }