Merged -r 502:525 from hyracks_dev_next into branch

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@527 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
index f40e8cd..c433bf9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
@@ -67,7 +67,6 @@
             String[] opParts = null;
             if (!jas.allocations.containsKey(oid)) {
                 Set<ConstraintExpression> opConstraints = jas.opConstraints.get(oid);
-                System.err.println("Constraints: " + opConstraints);
                 for (ConstraintExpression ce : opConstraints) {
                     int nParts = getNumPartitions(oid, ce);
                     if (nParts != -1) {
@@ -137,12 +136,9 @@
     }
 
     private int getNumPartitions(OperatorDescriptorId oid, ConstraintExpression ce) {
-        System.err.println(ce);
         if (ce.getTag() == ExpressionTag.RELATIONAL) {
             RelationalExpression re = (RelationalExpression) ce;
             if (re.getOperator() == RelationalExpression.Operator.EQUAL) {
-                System.err.println("Left: " + re.getLeft());
-                System.err.println("Right: " + re.getRight());
                 if (re.getLeft().getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
                     return getNumPartitions(oid, (PartitionCountExpression) re.getLeft(), re.getRight());
                 } else if (re.getRight().getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
@@ -209,4 +205,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..123fedf
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/AvgAggregatorDescriptorFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class AvgAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    private final int avgField;
+    private int outField = -1;
+
+    public AvgAggregatorDescriptorFactory(int avgField) {
+        this.avgField = avgField;
+    }
+
+    public AvgAggregatorDescriptorFactory(int avgField, int outField) {
+        this.avgField = avgField;
+        this.outField = outField;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException {
+
+        if (this.outField < 0)
+            this.outField = keyFields.length;
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+                int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+                int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
+
+                try {
+                    tupleBuilder.getDataOutput().writeInt(sum / count);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException();
+                }
+            }
+
+            @Override
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+                int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+                int count = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
+                try {
+                    tupleBuilder.getDataOutput().writeInt(sum);
+                    tupleBuilder.getDataOutput().writeInt(count);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException();
+                }
+            }
+
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                // Init aggregation value
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, avgField);
+                int sum = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+                int count = 1;
+
+                try {
+                    tupleBuilder.getDataOutput().writeInt(sum);
+                    tupleBuilder.getDataOutput().writeInt(count);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException();
+                }
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+                int sum1 = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+                int count1 = 1;
+
+                int sum2 = IntegerSerializerDeserializer.getInt(data, offset);
+                int count2 = IntegerSerializerDeserializer.getInt(data, offset + 4);
+
+                ByteBuffer buf = ByteBuffer.wrap(data, offset, 8);
+                buf.putInt(sum1 + sum2);
+                buf.putInt(count1 + count2);
+
+                return 8;
+            }
+        };
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..ef6a139
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ConcatAggregatorDescriptorFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int INIT_ACCUMULATORS_SIZE = 8;
+
+    private final int concatField;
+    private int outField = -1;
+
+    /**
+     * Initialize the aggregator, with the field to be concatenated.
+     * 
+     * @param concatField
+     */
+    public ConcatAggregatorDescriptorFactory(int concatField) {
+        this.concatField = concatField;
+    }
+
+    /**
+     * Initialize the aggregator, with the field index to be concatenated, and
+     * also the field where the aggregation result will be outputted.
+     * 
+     * @param concatField
+     * @param outField
+     */
+    public ConcatAggregatorDescriptorFactory(int concatField, int outField) {
+        this.concatField = concatField;
+        this.outField = outField;
+    }
+
+    /**
+     * Create a concatenation aggregator. A byte buffer will be allocated inside of the
+     * aggregator to contain the partial aggregation results. A reference will be written
+     * onto the output frame for indexing the aggregation result from the buffer.
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
+
+        if (this.outField < 0)
+            this.outField = keyFields.length;
+
+        return new IAggregatorDescriptor() {
+
+            byte[][] buf = new byte[INIT_ACCUMULATORS_SIZE][];
+
+            int currentAggregatorIndex = -1;
+            int aggregatorCount = 0;
+
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                // Initialize the aggregation value
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+                int fieldLength = accessor.getFieldLength(tIndex, concatField);
+                int appendOffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+                // Get the initial value
+                currentAggregatorIndex++;
+                if (currentAggregatorIndex >= buf.length) {
+                    byte[][] newBuf = new byte[buf.length * 2][];
+                    for (int i = 0; i < buf.length; i++) {
+                        newBuf[i] = buf[i];
+                    }
+                    this.buf = newBuf;
+                }
+                buf[currentAggregatorIndex] = new byte[fieldLength];
+                System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0,
+                        fieldLength);
+                // Update the aggregator index
+                aggregatorCount++;
+
+                try {
+                    tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
+                } catch (IOException e) {
+                    throw new HyracksDataException();
+                }
+            }
+
+            @Override
+            public void reset() {
+                currentAggregatorIndex = -1;
+                aggregatorCount = 0;
+            }
+
+            @Override
+            public void close() {
+                currentAggregatorIndex = -1;
+                aggregatorCount = 0;
+                for (int i = 0; i < buf.length; i++) {
+                    buf[i] = null;
+                }
+            }
+
+            @Override
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+                    throws HyracksDataException {
+                int refIndex = IntegerSerializerDeserializer.getInt(data, offset);
+                // FIXME Should be done in binary way
+                StringBuilder sbder = new StringBuilder();
+                sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                        new ByteArrayInputStream(buf[refIndex]))));
+                // Get the new data
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
+                int fieldLength = accessor.getFieldLength(tIndex, concatField);
+                sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+                                + accessor.getFieldSlotsLength() + fieldStart, fieldLength))));
+
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
+                buf[refIndex] = baos.toByteArray();
+                return 4;
+            }
+
+            @Override
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+                int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
+                        + accessor.getFieldSlotsLength() + fieldStart);
+
+                try {
+                    if (refIndex >= 0)
+                        tupleBuilder.getDataOutput().write(buf[refIndex]);
+                    else {
+                        int fieldLength = accessor.getFieldLength(tIndex, outField);
+                        tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+                                tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4, fieldLength - 4);
+                    }
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException();
+                }
+            }
+
+            @Override
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldOffset = accessor.getFieldStartOffset(tIndex, outField);
+                int fieldLength = accessor.getFieldLength(tIndex, outField);
+                int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
+                        + accessor.getFieldSlotsLength() + fieldOffset);
+
+                try {
+                    tupleBuilder.getDataOutput().writeInt(-1);
+                    if (refIndex < 0) {
+                        tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+                                tupleOffset + accessor.getFieldSlotsLength() + fieldOffset + 4, fieldLength - 4);
+                    } else {
+                        tupleBuilder.getDataOutput().write(buf[refIndex], 0, buf[refIndex].length);
+                    }
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException();
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..3a42fbc
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorDescriptorFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class CountAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private int outField = -1;
+
+    public CountAggregatorDescriptorFactory() {
+    }
+
+    public CountAggregatorDescriptorFactory(int outField) {
+        this.outField = outField;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
+
+        if (this.outField < 0) {
+            this.outField = keyFields.length;
+        }
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+                    throws HyracksDataException {
+                ByteBuffer buf = ByteBuffer.wrap(data);
+                int count = buf.getInt(offset);
+                buf.putInt(offset, count + 1);
+                return 4;
+            }
+
+            @Override
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+                try {
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
+                }
+            }
+
+            @Override
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+                try {
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
+                }
+            }
+
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+
+            }
+        };
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
new file mode 100644
index 0000000..dfc31cd
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IAggregatorDescriptor {
+
+    /**
+     * Initialize the aggregator with an input tuple specified by the input
+     * frame and tuple index. This function will write the initialized partial
+     * result into the tuple builder.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param tupleBuilder
+     * @throws HyracksDataException
+     */
+    public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+            throws HyracksDataException;
+
+    /**
+     * Aggregate the input tuple with the partial result specified by the bytes.
+     * The new value then is written back to the bytes field specified.
+     * It is the developer's responsibility to have the new result not exceed
+     * the given bytes.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param data
+     * @param offset
+     * @param length
+     * @return
+     * @throws HyracksDataException
+     */
+    public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+            throws HyracksDataException;
+
+    /**
+     * Output the partial aggregation result to an array tuple builder.
+     * Necessary additional information for aggregation should be maintained.
+     * For example, for an aggregator calculating AVG, the count and also the
+     * current average should be maintained as the partial results.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param tupleBuilder
+     * @throws HyracksDataException
+     */
+    public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+            throws HyracksDataException;
+
+    /**
+     * Output the final aggregation result to an array tuple builder.
+     * 
+     * @param accessor
+     * @param tIndex
+     * @param tupleBuilder
+     * @return
+     * @throws HyracksDataException
+     */
+    public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+            throws HyracksDataException;
+
+    /**
+     * reset the internal states
+     */
+    public void reset();
+
+    /**
+     * Close the aggregator. Necessary clean-up code should be implemented here.
+     */
+    public void close();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..e324e1c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IAggregatorDescriptorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IAggregatorDescriptorFactory extends Serializable {
+
+    /**
+     * Create an aggregator.
+     * 
+     * @param ctx
+     * @param inRecordDescriptor
+     * @param outRecordDescriptor
+     * @param keyFields
+     * @return
+     * @throws HyracksDataException
+     */
+    IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..85a182f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IntSumAggregatorDescriptorFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+/**
+ * @author jarodwen
+ */
+public class IntSumAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final int aggField;
+    private int outField = -1;
+
+    public IntSumAggregatorDescriptorFactory(int aggField) {
+        this.aggField = aggField;
+    }
+
+    public IntSumAggregatorDescriptorFactory(int aggField, int outField) {
+        this.aggField = aggField;
+        this.outField = outField;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
+
+        if (this.outField < 0) {
+            this.outField = keyFields.length;
+        }
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int sum = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, sum);
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+                    throws HyracksDataException {
+                int sum = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+                // Update the value of tuple 2
+                ByteBuffer buf = ByteBuffer.wrap(data);
+                sum += buf.getInt(offset);
+                buf.putInt(offset, sum);
+                return 4;
+            }
+
+            @Override
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+                try {
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
+                }
+            }
+
+            @Override
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
+
+                try {
+                    tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
+                            tupleOffset + accessor.getFieldSlotsLength() + fieldStart, 4);
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write int sum as a partial result.");
+                }
+            }
+
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+
+            }
+        };
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..d30b2b2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorDescriptorFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+/**
+ * @author jarodwen
+ */
+public class MultiAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IAggregatorDescriptorFactory[] aggregatorFactories;
+
+    public MultiAggregatorDescriptorFactory(IAggregatorDescriptorFactory[] aggregatorFactories) {
+        this.aggregatorFactories = aggregatorFactories;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksStageletContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(final IHyracksStageletContext ctx,
+            final RecordDescriptor inRecordDescriptor, final RecordDescriptor outRecordDescriptor, final int[] keyFields)
+            throws HyracksDataException {
+
+        final IAggregatorDescriptor[] aggregators = new IAggregatorDescriptor[this.aggregatorFactories.length];
+        for (int i = 0; i < aggregators.length; i++) {
+            aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
+                    keyFields);
+        }
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].init(accessor, tIndex, tupleBuilder);
+                }
+            }
+
+            @Override
+            public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
+                    throws HyracksDataException {
+                int adjust = 0;
+                for (int i = 0; i < aggregators.length; i++) {
+                    adjust += aggregators[i].aggregate(accessor, tIndex, data, offset + adjust, length - adjust);
+                }
+                return adjust;
+            }
+
+            @Override
+            public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].outputPartialResult(accessor, tIndex, tupleBuilder);
+                }
+            }
+
+            @Override
+            public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
+                    throws HyracksDataException {
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].outputResult(accessor, tIndex, tupleBuilder);
+                }
+            }
+
+            @Override
+            public void close() {
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].close();
+                }
+            }
+
+            @Override
+            public void reset() {
+                for (int i = 0; i < aggregators.length; i++) {
+                    aggregators[i].reset();
+                }
+            }
+
+        };
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
index 957c453..eb9c12d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
@@ -24,7 +24,6 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregator;
 import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.ISpillableAccumulatingAggregator;
 
 public class MultiAggregatorFactory implements IAccumulatingAggregatorFactory {
     private static final long serialVersionUID = 1L;
@@ -36,92 +35,6 @@
     }
 
     @Override
-    public ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
-            RecordDescriptor inRecordDesc, final RecordDescriptor outRecordDescriptor) {
-        final ISpillableFieldValueResultingAggregator aggregators[] = new ISpillableFieldValueResultingAggregator[aFactories.length];
-        for (int i = 0; i < aFactories.length; ++i) {
-            aggregators[i] = aFactories[i].createSpillableFieldValueResultingAggregator();
-        }
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
-        return new ISpillableAccumulatingAggregator() {
-            private boolean pending;
-
-            @Override
-            public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
-                    int[] keyFieldIndexes) throws HyracksDataException {
-                if (!pending) {
-                    tb.reset();
-                    for (int i = 0; i < keyFieldIndexes.length; ++i) {
-                        tb.addField(accessor, tIndex, keyFieldIndexes[i]);
-                    }
-                    DataOutput dos = tb.getDataOutput();
-                    for (int i = 0; i < aggregators.length; ++i) {
-                        aggregators[i].output(dos);
-                        tb.addFieldEndOffset();
-                    }
-                }
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    pending = true;
-                    return false;
-                }
-                return true;
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                tb.reset();
-                for (int i = 0; i < aggregators.length; ++i) {
-                    aggregators[i].init(accessor, tIndex);
-                }
-                pending = false;
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; ++i) {
-                    aggregators[i].accumulate(accessor, tIndex);
-                }
-            }
-
-            @Override
-            public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
-                    throws HyracksDataException {
-                tb.reset();
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].initFromPartial(accessor, tIndex, keyFieldIndexes.length + i);
-                }
-                pending = false;
-            }
-
-            @Override
-            public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
-                    throws HyracksDataException {
-                for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].accumulatePartialResult(accessor, tIndex, keyFieldIndexes.length + i);
-                }
-
-            }
-
-            @Override
-            public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException {
-                if (!pending) {
-                    // TODO Here to be fixed:
-                    DataOutput dos = tbder.getDataOutput();
-                    for (int i = 0; i < aggregators.length; ++i) {
-                        aggregators[i].output(dos);
-                        tbder.addFieldEndOffset();
-                    }
-                }
-                if (!appender.append(tbder.getFieldEndOffsets(), tbder.getByteArray(), 0, tbder.getSize())) {
-                    pending = true;
-                    return false;
-                }
-                return true;
-            }
-        };
-    }
-
-    @Override
     public IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
         final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java
index 4516c8d..5d94c12 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordReader.java
@@ -16,8 +16,8 @@
 
 public interface IRecordReader {
 
-	  public boolean read(Object[] record) throws Exception;
+    public boolean read(Object[] record) throws Exception;
 
-      public void close();
-      
+    public void close();
+
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java
index a319727..679088f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IRecordWriter.java
@@ -17,10 +17,9 @@
 import java.io.File;
 
 public interface IRecordWriter {
-	  
-      public void close();
-      
-      public void write(Object[] record) throws Exception;
 
-	
+    public void close();
+
+    public void write(Object[] record) throws Exception;
+
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 530207d..d599fee 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -63,7 +63,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-    throws HyracksDataException {
+            throws HyracksDataException {
         // Output files
         final FileSplit[] splits = fileSplitProvider.getFileSplits();
         // Frame accessor
@@ -95,7 +95,7 @@
                     frameTupleAccessor.reset(buffer);
                     for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
                         int start = frameTupleAccessor.getTupleStartOffset(tIndex)
-                        + frameTupleAccessor.getFieldSlotsLength();
+                                + frameTupleAccessor.getFieldSlotsLength();
                         bbis.setByteBuffer(buffer, start);
                         Object[] record = new Object[recordDescriptor.getFields().length];
                         for (int i = 0; i < record.length; ++i) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
index aa6635d..38ac1a3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
@@ -66,14 +66,14 @@
     protected IRecordReader createRecordReader(File file, RecordDescriptor desc) throws Exception {
         return new RecordReaderImpl(file, desc);
     }
-    
+
     @Override
-	protected void configure() throws Exception {
-		// currently a no-op, but is meant to initialize , if required before it is asked 
-		// to create a record reader
-		// this is executed at the node and is useful for operators that could not be 
-		// initialized from the client completely, because of lack of information specific 
-		// to the node where the operator gets executed. 
-		
-	}
+    protected void configure() throws Exception {
+        // currently a no-op, but is meant to initialize , if required before it is asked 
+        // to create a record reader
+        // this is executed at the node and is useful for operators that could not be 
+        // initialized from the client completely, because of lack of information specific 
+        // to the node where the operator gets executed. 
+
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java
index 1c52b25..1858a73 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordWriter.java
@@ -14,73 +14,72 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.file;
 
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 
-public abstract class RecordWriter implements IRecordWriter{
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
 
-	 
-	protected final BufferedWriter bufferedWriter;
+public abstract class RecordWriter implements IRecordWriter {
+
+    protected final BufferedWriter bufferedWriter;
     protected final int[] columns;
     protected final char separator;
-    
-    public static final char COMMA = ',';
-    
-    public RecordWriter(Object [] args) throws Exception{
-    	OutputStream outputStream = createOutputStream(args);
-    	if(outputStream != null){
-    		bufferedWriter = new BufferedWriter(new OutputStreamWriter(createOutputStream(args)));
-    	}else{
-    		bufferedWriter = null;
-    	}
-    	this.columns = null;
-    	this.separator = COMMA;
-    }
-    
-    public RecordWriter(int []columns, char separator, Object[] args) throws Exception{
-    	OutputStream outputStream = createOutputStream(args);
-    	if(outputStream != null){
-    		bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream));
-    	}else{
-    		bufferedWriter = null;
-    	}
-    	this.columns = columns;
-    	this.separator = separator;
-    }
-    
-	@Override
-     public void close() {
-         try {
-             bufferedWriter.close();
-         } catch (IOException e) {
-             e.printStackTrace();
-         }
-     }
 
-     @Override
-     public void write(Object[] record) throws Exception {
-         if (columns == null) {
-             for (int i = 0; i < record.length; ++i) {
-                 if (i != 0) {
-                     bufferedWriter.write(separator);
-                 }
-                 bufferedWriter.write(String.valueOf(record[i]));
-             }
-         } else {
-             for (int i = 0; i < columns.length; ++i) {
-                 if (i != 0) {
-                     bufferedWriter.write(separator);
-                 }
-                 bufferedWriter.write(String.valueOf(record[columns[i]]));
-             }
-         }
-         bufferedWriter.write("\n");
-     }
-     
-     public abstract OutputStream createOutputStream(Object[] args) throws Exception;
-   
+    public static final char COMMA = ',';
+
+    public RecordWriter(Object[] args) throws Exception {
+        OutputStream outputStream = createOutputStream(args);
+        if (outputStream != null) {
+            bufferedWriter = new BufferedWriter(new OutputStreamWriter(createOutputStream(args)));
+        } else {
+            bufferedWriter = null;
+        }
+        this.columns = null;
+        this.separator = COMMA;
+    }
+
+    public RecordWriter(int[] columns, char separator, Object[] args) throws Exception {
+        OutputStream outputStream = createOutputStream(args);
+        if (outputStream != null) {
+            bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream));
+        } else {
+            bufferedWriter = null;
+        }
+        this.columns = columns;
+        this.separator = separator;
+    }
+
+    @Override
+    public void close() {
+        try {
+            bufferedWriter.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void write(Object[] record) throws Exception {
+        if (columns == null) {
+            for (int i = 0; i < record.length; ++i) {
+                if (i != 0) {
+                    bufferedWriter.write(separator);
+                }
+                bufferedWriter.write(StringSerializationUtils.toString(record[i]));
+            }
+        } else {
+            for (int i = 0; i < columns.length; ++i) {
+                if (i != 0) {
+                    bufferedWriter.write(separator);
+                }
+                bufferedWriter.write(StringSerializationUtils.toString(record[columns[i]]));
+            }
+        }
+        bufferedWriter.write("\n");
+    }
+
+    public abstract OutputStream createOutputStream(Object[] args) throws Exception;
+
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 4b7aff3..fc5f282 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -23,98 +23,97 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 
-public class DeserializedPreclusteredGroupOperator implements
-		IOpenableDataWriterOperator {
-	private final int[] groupFields;
+public class DeserializedPreclusteredGroupOperator implements IOpenableDataWriterOperator {
+    private final int[] groupFields;
 
-	private final IComparator[] comparators;
+    private final IComparator[] comparators;
 
-	private final IGroupAggregator aggregator;
+    private final IGroupAggregator aggregator;
 
-	private Object[] lastData;
+    private Object[] lastData;
 
-	private IOpenableDataWriter<Object[]> writer;
+    private IOpenableDataWriter<Object[]> writer;
 
-	private List<Object[]> buffer;
+    private List<Object[]> buffer;
 
-	private IOpenableDataReader<Object[]> reader;
+    private IOpenableDataReader<Object[]> reader;
 
-	public DeserializedPreclusteredGroupOperator(int[] groupFields,
-			IComparator[] comparators, IGroupAggregator aggregator) {
-		this.groupFields = groupFields;
-		this.comparators = comparators;
-		this.aggregator = aggregator;
-		buffer = new ArrayList<Object[]>();
-		reader = new IOpenableDataReader<Object[]>() {
-			private int idx;
+    public DeserializedPreclusteredGroupOperator(int[] groupFields, IComparator[] comparators,
+            IGroupAggregator aggregator) {
+        this.groupFields = groupFields;
+        this.comparators = comparators;
+        this.aggregator = aggregator;
+        buffer = new ArrayList<Object[]>();
+        reader = new IOpenableDataReader<Object[]>() {
+            private int idx;
 
-			@Override
-			public void open() {
-				idx = 0;
-			}
+            @Override
+            public void open() {
+                idx = 0;
+            }
 
-			@Override
-			public void close() {
-			}
+            @Override
+            public void close() {
+            }
 
-			@Override
-			public Object[] readData() {
-				return idx >= buffer.size() ? null : buffer.get(idx++);
-			}
-		};
-	}
+            @Override
+            public Object[] readData() {
+                return idx >= buffer.size() ? null : buffer.get(idx++);
+            }
+        };
+    }
 
-	@Override
-	public void close() throws HyracksDataException {
-		if (!buffer.isEmpty()) {
-			aggregate();
-		}
-		writer.close();
-		try {
-			aggregator.close();
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-	}
+    @Override
+    public void close() throws HyracksDataException {
+        if (!buffer.isEmpty()) {
+            aggregate();
+        }
+        writer.close();
+        try {
+            aggregator.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 
-	private void aggregate() throws HyracksDataException {
-		reader.open();
-		aggregator.aggregate(reader, writer);
-		reader.close();
-		buffer.clear();
-	}
+    private void aggregate() throws HyracksDataException {
+        reader.open();
+        aggregator.aggregate(reader, writer);
+        reader.close();
+        buffer.clear();
+    }
 
-	@Override
-	public void open() throws HyracksDataException {
-		lastData = null;
-		writer.open();
-	}
+    @Override
+    public void open() throws HyracksDataException {
+        lastData = null;
+        writer.open();
+    }
 
-	@Override
-	public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
-		if (index != 0) {
-			throw new IllegalArgumentException();
-		}
-		this.writer = writer;
-	}
+    @Override
+    public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+        if (index != 0) {
+            throw new IllegalArgumentException();
+        }
+        this.writer = writer;
+    }
 
-	@Override
-	public void writeData(Object[] data) throws HyracksDataException {
-		if (lastData != null && compare(data, lastData) != 0) {
-			aggregate();
-		}
-		lastData = data;
-		buffer.add(data);
-	}
+    @Override
+    public void writeData(Object[] data) throws HyracksDataException {
+        if (lastData != null && compare(data, lastData) != 0) {
+            aggregate();
+        }
+        lastData = data;
+        buffer.add(data);
+    }
 
-	private int compare(Object[] d1, Object[] d2) {
-		for (int i = 0; i < groupFields.length; ++i) {
-			int fIdx = groupFields[i];
-			int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
-			if (c != 0) {
-				return c;
-			}
-		}
-		return 0;
-	}
+    private int compare(Object[] d1, Object[] d2) {
+        for (int i = 0; i < groupFields.length; ++i) {
+            int fIdx = groupFields[i];
+            int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
new file mode 100644
index 0000000..cdb0f4d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -0,0 +1,602 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    /**
+     * The input frame identifier (in the job environment)
+     */
+    private static final String GROUPTABLES = "gtables";
+    /**
+     * The runs files identifier (in the job environment)
+     */
+    private static final String RUNS = "runs";
+    private final int[] keyFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory firstNormalizerFactory;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final IAggregatorDescriptorFactory mergeFactory;
+    private final int framesLimit;
+    private final ISpillableTableFactory spillableTableFactory;
+    private final boolean isOutputSorted;
+
+    public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergeFactory,
+            RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+        super(spec, 1, 1);
+        this.framesLimit = framesLimit;
+        if (framesLimit <= 1) {
+            // Minimum of 2 frames: 1 for input records, and 1 for output
+            // aggregation results.
+            throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
+        }
+
+        this.aggregatorFactory = aggregatorFactory;
+        this.mergeFactory = mergeFactory;
+        this.keyFields = keyFields;
+        this.comparatorFactories = comparatorFactories;
+        this.firstNormalizerFactory = firstNormalizerFactory;
+        this.spillableTableFactory = spillableTableFactory;
+        this.isOutputSorted = isOutputSorted;
+
+        // Set the record descriptor. Note that since
+        // this operator is a unary operator,
+        // only the first record descriptor is used here.
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        AggregateActivity aggregateAct = new AggregateActivity();
+        MergeActivity mergeAct = new MergeActivity();
+
+        builder.addTask(aggregateAct);
+        builder.addSourceEdge(0, aggregateAct, 0);
+
+        builder.addTask(mergeAct);
+        builder.addTargetEdge(0, mergeAct, 0);
+
+        builder.addBlockingEdge(aggregateAct, mergeAct);
+    }
+
+    private class AggregateActivity extends AbstractActivityNode {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return ExternalGroupOperatorDescriptor.this;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) throws HyracksDataException {
+            final ISpillableTable gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields,
+                    comparatorFactories, firstNormalizerFactory, aggregatorFactory,
+                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+                    ExternalGroupOperatorDescriptor.this.framesLimit);
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+
+                /**
+                 * Run files
+                 */
+                private LinkedList<RunFileReader> runs;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    runs = new LinkedList<RunFileReader>();
+                    gTable.reset();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    accessor.reset(buffer);
+                    int tupleCount = accessor.getTupleCount();
+                    for (int i = 0; i < tupleCount; i++) {
+                        // If the group table is too large, flush the table into
+                        // a run file.
+                        if (!gTable.insert(accessor, i)) {
+                            flushFramesToRun();
+                            if (!gTable.insert(accessor, i))
+                                throw new HyracksDataException(
+                                        "Failed to insert a new buffer into the aggregate operator!");
+                        }
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    if (gTable.getFrameCount() >= 0) {
+                        if (runs.size() <= 0) {
+                            // All in memory
+                            env.set(GROUPTABLES, gTable);
+                        } else {
+                            // flush the memory into the run file.
+                            flushFramesToRun();
+                            gTable.close();
+                        }
+                    }
+                    env.set(RUNS, runs);
+                }
+
+                private void flushFramesToRun() throws HyracksDataException {
+                    FileReference runFile;
+                    try {
+                        runFile = ctx.getJobletContext().createWorkspaceFile(
+                                ExternalGroupOperatorDescriptor.class.getSimpleName());
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
+                    RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+                    writer.open();
+                    try {
+                        gTable.sortFrames();
+                        gTable.flushFrames(writer, true);
+                    } catch (Exception ex) {
+                        throw new HyracksDataException(ex);
+                    } finally {
+                        writer.close();
+                    }
+                    gTable.reset();
+                    runs.add(((RunFileWriter) writer).createReader());
+                }
+            };
+            return op;
+        }
+    }
+
+    private class MergeActivity extends AbstractActivityNode {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return ExternalGroupOperatorDescriptor.this;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) throws HyracksDataException {
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            for (int i = 0; i < comparatorFactories.length; ++i) {
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            }
+            final IAggregatorDescriptor currentWorkingAggregator = mergeFactory.createAggregator(ctx,
+                    recordDescriptors[0], recordDescriptors[0], keyFields);
+            final int[] storedKeys = new int[keyFields.length];
+            // Get the list of the fields in the stored records.
+            for (int i = 0; i < keyFields.length; ++i) {
+                storedKeys[i] = i;
+            }
+            // Tuple builder
+            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+                /**
+                 * Input frames, one for each run file.
+                 */
+                private List<ByteBuffer> inFrames;
+
+                /**
+                 * Output frame.
+                 */
+                private ByteBuffer outFrame, writerFrame;
+
+                /**
+                 * List of the run files to be merged
+                 */
+                private LinkedList<RunFileReader> runs;
+
+                /**
+                 * how many frames to be read ahead once
+                 */
+                private int runFrameLimit = 1;
+
+                private int[] currentFrameIndexInRun;
+                private int[] currentRunFrames;
+                private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                        recordDescriptors[0]);
+                private ArrayTupleBuilder finalTupleBuilder;
+                private FrameTupleAppender writerFrameAppender;
+
+                @SuppressWarnings("unchecked")
+                public void initialize() throws HyracksDataException {
+                    runs = (LinkedList<RunFileReader>) env.get(RUNS);
+                    writer.open();
+                    try {
+                        if (runs.size() <= 0) {
+                            ISpillableTable gTable = (ISpillableTable) env.get(GROUPTABLES);
+                            if (gTable != null) {
+                                if (isOutputSorted)
+                                    gTable.sortFrames();
+                                gTable.flushFrames(writer, false);
+                            }
+                            env.set(GROUPTABLES, null);
+                        } else {
+                            long start = System.currentTimeMillis();
+                            inFrames = new ArrayList<ByteBuffer>();
+                            outFrame = ctx.allocateFrame();
+                            outFrameAppender.reset(outFrame, true);
+                            outFrameAccessor.reset(outFrame);
+                            while (runs.size() > 0) {
+                                try {
+                                    doPass(runs);
+                                } catch (Exception e) {
+                                    throw new HyracksDataException(e);
+                                }
+                            }
+                            inFrames.clear();
+                            long end = System.currentTimeMillis();
+                            System.out.println("merge time " + (end - start));
+                        }
+                    } finally {
+                        writer.close();
+                    }
+                    env.set(RUNS, null);
+                }
+
+                private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+                    FileReference newRun = null;
+                    IFrameWriter writer = this.writer;
+                    boolean finalPass = false;
+
+                    while (inFrames.size() + 2 < framesLimit) {
+                        inFrames.add(ctx.allocateFrame());
+                    }
+                    int runNumber;
+                    if (runs.size() + 2 <= framesLimit) {
+                        finalPass = true;
+                        runFrameLimit = (framesLimit - 2) / runs.size();
+                        runNumber = runs.size();
+                    } else {
+                        runNumber = framesLimit - 2;
+                        newRun = ctx.getJobletContext().createWorkspaceFile(
+                                ExternalGroupOperatorDescriptor.class.getSimpleName());
+                        writer = new RunFileWriter(newRun, ctx.getIOManager());
+                        writer.open();
+                    }
+                    try {
+                        currentFrameIndexInRun = new int[runNumber];
+                        currentRunFrames = new int[runNumber];
+                        // Create file readers for each input run file, only
+                        // for the ones fit into the inFrames
+                        RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+                        // Create input frame accessor
+                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+                        // Build a priority queue for extracting tuples in order
+                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+
+                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
+                                recordDescriptors[0], runNumber, comparator);
+                        // Maintain a list of visiting index for all runs'
+                        // current frame
+                        int[] tupleIndices = new int[runNumber];
+                        for (int runIndex = runNumber - 1; runIndex >= 0; runIndex--) {
+                            tupleIndices[runIndex] = 0;
+                            // Load the run file
+                            runFileReaders[runIndex] = runs.get(runIndex);
+                            runFileReaders[runIndex].open();
+
+                            currentRunFrames[runIndex] = 0;
+                            currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+                            for (int j = 0; j < runFrameLimit; j++) {
+                                int frameIndex = currentFrameIndexInRun[runIndex] + j;
+                                if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
+                                            recordDescriptors[0]);
+                                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                                    currentRunFrames[runIndex]++;
+                                    if (j == 0)
+                                        setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
+                                                topTuples);
+                                } else {
+                                    break;
+                                }
+                            }
+                        }
+
+                        // Start merging
+                        while (!topTuples.areRunsExhausted()) {
+                            // Get the top record
+                            ReferenceEntry top = topTuples.peek();
+                            int tupleIndex = top.getTupleIndex();
+                            int runIndex = topTuples.peek().getRunid();
+                            FrameTupleAccessor fta = top.getAccessor();
+
+                            int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+                            if (currentTupleInOutFrame < 0
+                                    || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+                                // Initialize the first output record
+                                // Reset the tuple builder
+                                tupleBuilder.reset();
+                                for (int i = 0; i < keyFields.length; i++) {
+                                    tupleBuilder.addField(fta, tupleIndex, i);
+                                }
+
+                                currentWorkingAggregator.init(fta, tupleIndex, tupleBuilder);
+                                if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+                                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                                    // Make sure that when the outFrame is being
+                                    // flushed, all results in it are in
+                                    // the correct state
+                                    flushOutFrame(writer, finalPass);
+                                    if (!outFrameAppender.append(tupleBuilder.getFieldEndOffsets(),
+                                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))
+                                        throw new HyracksDataException(
+                                                "Failed to append an aggregation result to the output frame.");
+                                }
+                            } else {
+                                // if new tuple is in the same group of the
+                                // current aggregator
+                                // do merge and output to the outFrame
+                                int tupleOffset = outFrameAccessor.getTupleStartOffset(currentTupleInOutFrame);
+                                int fieldOffset = outFrameAccessor.getFieldStartOffset(currentTupleInOutFrame,
+                                        keyFields.length);
+                                int fieldLength = outFrameAccessor.getFieldLength(currentTupleInOutFrame,
+                                        keyFields.length);
+                                currentWorkingAggregator.aggregate(fta, tupleIndex, outFrameAccessor.getBuffer()
+                                        .array(), tupleOffset + outFrameAccessor.getFieldSlotsLength() + fieldOffset,
+                                        fieldLength);
+                            }
+                            tupleIndices[runIndex]++;
+                            setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+                        }
+                        // Flush the outFrame
+                        if (outFrameAppender.getTupleCount() > 0) {
+                            flushOutFrame(writer, finalPass);
+                        }
+                        // After processing all records, flush the aggregator
+                        currentWorkingAggregator.close();
+                        runs.subList(0, runNumber).clear();
+                        // insert the new run file into the beginning of the run
+                        // file list
+                        if (!finalPass) {
+                            runs.add(0, ((RunFileWriter) writer).createReader());
+                        }
+                    } finally {
+                        if (!finalPass) {
+                            writer.close();
+                        }
+                    }
+                }
+
+                private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+                    if (finalTupleBuilder == null) {
+                        finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
+                    }
+                    if (writerFrame == null) {
+                        writerFrame = ctx.allocateFrame();
+                    }
+                    if (writerFrameAppender == null) {
+                        writerFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+                        writerFrameAppender.reset(writerFrame, true);
+                    }
+                    outFrameAccessor.reset(outFrame);
+                    for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+                        finalTupleBuilder.reset();
+                        for (int j = 0; j < keyFields.length; j++) {
+                            finalTupleBuilder.addField(outFrameAccessor, i, j);
+                        }
+                        if (isFinal)
+                            currentWorkingAggregator.outputResult(outFrameAccessor, i, finalTupleBuilder);
+                        else
+                            currentWorkingAggregator.outputPartialResult(outFrameAccessor, i, finalTupleBuilder);
+
+                        if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+                                finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                            FrameUtils.flushFrame(writerFrame, writer);
+                            writerFrameAppender.reset(writerFrame, true);
+                            if (!writerFrameAppender.append(finalTupleBuilder.getFieldEndOffsets(),
+                                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize()))
+                                throw new HyracksDataException(
+                                        "Failed to write final aggregation result to a writer frame!");
+                        }
+                    }
+                    if (writerFrameAppender.getTupleCount() > 0) {
+                        FrameUtils.flushFrame(writerFrame, writer);
+                        writerFrameAppender.reset(writerFrame, true);
+                    }
+                    outFrameAppender.reset(outFrame, true);
+                }
+
+                private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+                        FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
+                        throws HyracksDataException {
+                    int runStart = runIndex * runFrameLimit;
+                    boolean existNext = false;
+                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+                        // run already closed
+                        existNext = false;
+                    } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+                        // not the last frame for this run
+                        existNext = true;
+                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+                            tupleIndices[runIndex] = 0;
+                            currentFrameIndexInRun[runIndex]++;
+                        }
+                    } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
+                            .getTupleCount()) {
+                        // the last frame has expired
+                        existNext = true;
+                    } else {
+                        /**
+                         * If all tuples in the targeting frame have been
+                         * checked.
+                         */
+                        int frameOffset = runIndex * runFrameLimit;
+                        tupleIndices[runIndex] = 0;
+                        currentFrameIndexInRun[runIndex] = frameOffset;
+                        /**
+                         * read in batch
+                         */
+                        currentRunFrames[runIndex] = 0;
+                        for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
+                            ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
+                            if (runCursors[runIndex].nextFrame(buffer)) {
+                                tupleAccessors[frameOffset].reset(buffer);
+                                if (tupleAccessors[frameOffset].getTupleCount() > 0) {
+                                    existNext = true;
+                                } else {
+                                    throw new IllegalStateException("illegal: empty run file");
+                                }
+                                currentRunFrames[runIndex]++;
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+                    // Check whether the run file for the given runIndex has
+                    // more tuples
+                    if (existNext) {
+                        topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
+                                tupleIndices[runIndex]);
+                    } else {
+                        topTuples.pop();
+                        closeRun(runIndex, runCursors, tupleAccessors);
+                    }
+                }
+
+                /**
+                 * Close the run file, and also the corresponding readers and
+                 * input frame.
+                 * 
+                 * @param index
+                 * @param runCursors
+                 * @param tupleAccessor
+                 * @throws HyracksDataException
+                 */
+                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+                        throws HyracksDataException {
+                    if (runCursors[index] != null) {
+                        runCursors[index].close();
+                        runCursors[index] = null;
+                    }
+                }
+
+                private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+                    byte[] b1 = fta1.getBuffer().array();
+                    byte[] b2 = fta2.getBuffer().array();
+                    for (int f = 0; f < keyFields.length; ++f) {
+                        // Note: Since the comparison is only used in the merge
+                        // phase,
+                        // all the keys are clustered at the beginning of the
+                        // tuple.
+                        int fIdx = f;
+                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                                + fta1.getFieldStartOffset(j1, fIdx);
+                        int l1 = fta1.getFieldLength(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                                + fta2.getFieldStartOffset(j2, fIdx);
+                        int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+                        int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+                        int l2 = l2_end - l2_start;
+                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                        if (c != 0) {
+                            return c;
+                        }
+                    }
+                    return 0;
+                }
+            };
+            return op;
+        }
+
+        private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+            return new Comparator<ReferenceEntry>() {
+
+                @Override
+                public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+                    int j1 = o1.getTupleIndex();
+                    int j2 = o2.getTupleIndex();
+                    byte[] b1 = fta1.getBuffer().array();
+                    byte[] b2 = fta2.getBuffer().array();
+                    for (int f = 0; f < keyFields.length; ++f) {
+                        // Note: Since the comparison is only used in the merge
+                        // phase,
+                        // all the keys are clustered at the beginning of the
+                        // tuple.
+                        int fIdx = f;
+                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                                + fta1.getFieldStartOffset(j1, fIdx);
+                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                                + fta2.getFieldStartOffset(j2, fIdx);
+                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                        if (c != 0) {
+                            return c;
+                        }
+                    }
+                    return 0;
+                }
+
+            };
+        }
+
+    }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
deleted file mode 100644
index b13b087..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
-
-/**
- * This is an implementation of the external hash group operator.
- * The motivation of this operator is that when tuples are processed in
- * parallel, distinguished aggregating keys partitioned on one node may exceed
- * the main memory, so aggregation results should be output onto the disk to
- * make space for aggregating more input tuples.
- */
-public class ExternalHashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
-    /**
-     * The input frame identifier (in the job environment)
-     */
-    private static final String GROUPTABLES = "gtables";
-
-    /**
-     * The runs files identifier (in the job environment)
-     */
-    private static final String RUNS = "runs";
-
-    /**
-     * The fields used for grouping (grouping keys).
-     */
-    private final int[] keyFields;
-
-    /**
-     * The comparator for checking the grouping conditions, corresponding to the {@link #keyFields}.
-     */
-    private final IBinaryComparatorFactory[] comparatorFactories;
-
-    /**
-     * The aggregator factory for the aggregating field, corresponding to the {@link #aggregateFields}.
-     */
-    private IAccumulatingAggregatorFactory aggregatorFactory;
-
-    /**
-     * The maximum number of frames in the main memory.
-     */
-    private final int framesLimit;
-
-    /**
-     * Indicate whether the final output will be sorted or not.
-     */
-    private final boolean sortOutput;
-
-    /**
-     * Partition computer factory
-     */
-    private final ITuplePartitionComputerFactory tpcf;
-
-    /**
-     * The size of the in-memory table, which should be specified now by the
-     * creator of this operator descriptor.
-     */
-    private final int tableSize;
-
-    /**
-     * XXX Logger for debug information
-     */
-    private static Logger LOGGER = Logger.getLogger(ExternalHashGroupOperatorDescriptor.class.getName());
-
-    /**
-     * Constructor of the external hash group operator descriptor.
-     * 
-     * @param spec
-     * @param keyFields
-     *            The fields as keys of grouping.
-     * @param framesLimit
-     *            The maximum number of frames to be used in memory.
-     * @param sortOutput
-     *            Whether the output should be sorted or not. Note that if the
-     *            input data is large enough for external grouping, the output
-     *            will be sorted surely. The only case that when the output is
-     *            not sorted is when the size of the input data can be grouped
-     *            in memory and this parameter is false.
-     * @param tpcf
-     *            The partitioner.
-     * @param comparatorFactories
-     *            The comparators.
-     * @param aggregatorFactory
-     *            The aggregators.
-     * @param recordDescriptor
-     *            The record descriptor for the input data.
-     * @param tableSize
-     *            The maximum size of the in memory table usable to this
-     *            operator.
-     */
-    public ExternalHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
-            boolean sortOutput, ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
-            IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor recordDescriptor, int tableSize) {
-        super(spec, 1, 1);
-        this.framesLimit = framesLimit;
-        if (framesLimit <= 1) {
-            // Minimum of 2 frames: 1 for input records, and 1 for output
-            // aggregation results.
-            throw new IllegalStateException();
-        }
-        this.aggregatorFactory = aggregatorFactory;
-        this.keyFields = keyFields;
-        this.comparatorFactories = comparatorFactories;
-
-        this.sortOutput = sortOutput;
-
-        this.tpcf = tpcf;
-
-        this.tableSize = tableSize;
-
-        // Set the record descriptor. Note that since this operator is a unary
-        // operator,
-        // only the first record descritpor is used here.
-        recordDescriptors[0] = recordDescriptor;
-    }
-
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeTaskGraph
-     * (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
-     */
-    @Override
-    public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
-        MergeActivity mergeAct = new MergeActivity();
-
-        builder.addTask(partialAggAct);
-        builder.addSourceEdge(0, partialAggAct, 0);
-
-        builder.addTask(mergeAct);
-        builder.addTargetEdge(0, mergeAct, 0);
-
-        // FIXME Block or not?
-        builder.addBlockingEdge(partialAggAct, mergeAct);
-
-    }
-
-    private class PartialAggregateActivity extends AbstractActivityNode {
-
-        /**
-         * 
-         */
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
-                final IOperatorEnvironment env, final IRecordDescriptorProvider recordDescProvider, int partition,
-                int nPartitions) {
-            // Create the in-memory hash table
-            final SpillableGroupingHashTable gTable = new SpillableGroupingHashTable(ctx, keyFields,
-                    comparatorFactories, tpcf, aggregatorFactory, recordDescProvider.getInputRecordDescriptor(
-                            getOperatorId(), 0), recordDescriptors[0],
-                            // Always take one frame for the input records
-                            framesLimit - 1, tableSize);
-            // Create the tuple accessor
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
-            // Create the partial aggregate activity node
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-
-                /**
-                 * Run files
-                 */
-                private LinkedList<RunFileReader> runs;
-
-                @Override
-                public void close() throws HyracksDataException {
-                    if (gTable.getFrameCount() >= 0) {
-                        if (runs.size() <= 0) {
-                            // All in memory
-                            env.set(GROUPTABLES, gTable);
-                        } else {
-                            // flush the memory into the run file.
-                            flushFramesToRun();
-                        }
-                    }
-                    env.set(RUNS, runs);
-                }
-
-                @Override
-                public void flush() throws HyracksDataException {
-
-                }
-
-                /**
-                 * Process the next input buffer.
-                 * The actual insertion is processed in {@link #gTable}. It will
-                 * check whether it is possible to contain the data into the
-                 * main memory or not. If not, it will indicate the operator to
-                 * flush the content of the table into a run file.
-                 */
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    accessor.reset(buffer);
-                    int tupleCount = accessor.getTupleCount();
-                    for (int i = 0; i < tupleCount; i++) {
-                        // If the group table is too large, flush the table into
-                        // a run file.
-                        if (!gTable.insert(accessor, i)) {
-                            flushFramesToRun();
-                            if (!gTable.insert(accessor, i))
-                                throw new HyracksDataException(
-                                "Failed to insert a new buffer into the aggregate operator!");
-                        }
-                    }
-
-                }
-
-                @Override
-                public void open() throws HyracksDataException {
-                    runs = new LinkedList<RunFileReader>();
-                    gTable.reset();
-                }
-
-                /**
-                 * Flush the content of the group table into a run file.
-                 * During the flushing, the hash table will be sorted as first.
-                 * After that, a run file handler is initialized and the hash
-                 * table is flushed into the run file.
-                 * 
-                 * @throws HyracksDataException
-                 */
-                private void flushFramesToRun() throws HyracksDataException {
-                    // Sort the contents of the hash table.
-                    gTable.sortFrames();
-                    FileReference runFile;
-                    try {
-                        runFile = ctx.getJobletContext().createWorkspaceFile(
-                                ExternalHashGroupOperatorDescriptor.class.getSimpleName());
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
-                    writer.open();
-                    try {
-                        gTable.flushFrames(writer, true);
-                    } catch (Exception ex) {
-                        throw new HyracksDataException(ex);
-                    } finally {
-                        writer.close();
-                    }
-                    gTable.reset();
-                    runs.add(((RunFileWriter) writer).createReader());
-                    LOGGER.warning("Created run file: " + runFile.getFile().getAbsolutePath());
-                }
-
-            };
-
-            return op;
-        }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return ExternalHashGroupOperatorDescriptor.this;
-        }
-
-    }
-
-    private class MergeActivity extends AbstractActivityNode {
-
-        /**
-         * 
-         */
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
-                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
-                int nPartitions) {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                /**
-                 * Input frames, one for each run file.
-                 */
-                private List<ByteBuffer> inFrames;
-
-                /**
-                 * Output frame.
-                 */
-                private ByteBuffer outFrame;
-
-                /**
-                 * List of the run files to be merged
-                 */
-                LinkedList<RunFileReader> runs;
-
-                /**
-                 * Tuple appender for the output frame {@link #outFrame}.
-                 */
-                private FrameTupleAppender outFrameAppender;
-
-                private ISpillableAccumulatingAggregator visitingAggregator;
-                private ArrayTupleBuilder visitingKeyTuple;
-
-                @SuppressWarnings("unchecked")
-                @Override
-                public void initialize() throws HyracksDataException {
-                    runs = (LinkedList<RunFileReader>) env.get(RUNS);
-                    writer.open();
-
-                    try {
-                        if (runs.size() <= 0) {
-                            // If the aggregate results can be fit into
-                            // memory...
-                            SpillableGroupingHashTable gTable = (SpillableGroupingHashTable) env.get(GROUPTABLES);
-                            if (gTable != null) {
-                                gTable.flushFrames(writer, sortOutput);
-                            }
-                            env.set(GROUPTABLES, null);
-                        } else {
-                            // Otherwise, merge the run files into a single file
-                            inFrames = new ArrayList<ByteBuffer>();
-                            outFrame = ctx.allocateFrame();
-                            outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-                            outFrameAppender.reset(outFrame, true);
-                            for (int i = 0; i < framesLimit - 1; ++i) {
-                                inFrames.add(ctx.allocateFrame());
-                            }
-                            int passCount = 0;
-                            while (runs.size() > 0) {
-                                passCount++;
-                                try {
-                                    doPass(runs, passCount);
-                                } catch (Exception e) {
-                                    throw new HyracksDataException(e);
-                                }
-                            }
-                        }
-
-                    } finally {
-                        writer.close();
-                    }
-                    env.set(RUNS, null);
-                }
-
-                /**
-                 * Merge the run files once.
-                 * 
-                 * @param runs
-                 * @param passCount
-                 * @throws HyracksDataException
-                 * @throws IOException
-                 */
-                private void doPass(LinkedList<RunFileReader> runs, int passCount) throws HyracksDataException,
-                IOException {
-                    FileReference newRun = null;
-                    IFrameWriter writer = this.writer;
-                    boolean finalPass = false;
-
-                    int[] storedKeys = new int[keyFields.length];
-                    // Get the list of the fields in the stored records.
-                    for (int i = 0; i < keyFields.length; ++i) {
-                        storedKeys[i] = i;
-                    }
-
-                    // Release the space not used
-                    if (runs.size() + 1 <= framesLimit) {
-                        // If there are run files no more than the available
-                        // frame slots...
-                        // No run file to be generated, since the result can be
-                        // directly
-                        // outputted into the output frame for write.
-                        finalPass = true;
-                        for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
-                            inFrames.remove(i);
-                        }
-                    } else {
-                        // Otherwise, a new run file will be created
-                        newRun = ctx.getJobletContext().createWorkspaceFile(
-                                ExternalHashGroupOperatorDescriptor.class.getSimpleName());
-                        writer = new RunFileWriter(newRun, ctx.getIOManager());
-                        writer.open();
-                    }
-                    try {
-                        // Create run file read handler for each input frame
-                        RunFileReader[] runFileReaders = new RunFileReader[inFrames.size()];
-                        // Create input frame accessor
-                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
-                                recordDescriptors[0], inFrames.size(), comparator);
-                        // For the index of tuples visited in each frame.
-                        int[] tupleIndexes = new int[inFrames.size()];
-                        for (int i = 0; i < inFrames.size(); i++) {
-                            tupleIndexes[i] = 0;
-                            int runIndex = topTuples.peek().getRunid();
-                            runFileReaders[runIndex] = runs.get(runIndex);
-                            runFileReaders[runIndex].open();
-                            // Load the first frame of the file into the main
-                            // memory
-                            if (runFileReaders[runIndex].nextFrame(inFrames.get(runIndex))) {
-                                // initialize the tuple accessor for the frame
-                                tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
-                                        recordDescriptors[0]);
-                                tupleAccessors[runIndex].reset(inFrames.get(runIndex));
-                                setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
-                            } else {
-                                closeRun(runIndex, runFileReaders, tupleAccessors);
-                            }
-                        }
-                        // Merge
-                        // Get a key holder for the current working
-                        // aggregator keys
-                        visitingAggregator = null;
-                        visitingKeyTuple = null;
-                        // Loop on all run files, and update the key
-                        // holder.
-                        while (!topTuples.areRunsExhausted()) {
-                            // Get the top record
-                            ReferenceEntry top = topTuples.peek();
-                            int tupleIndex = top.getTupleIndex();
-                            int runIndex = topTuples.peek().getRunid();
-                            FrameTupleAccessor fta = top.getAccessor();
-                            if (visitingAggregator == null) {
-                                // Initialize the aggregator
-                                visitingAggregator = aggregatorFactory.createSpillableAggregator(ctx,
-                                        recordDescriptors[0], recordDescriptors[0]);
-                                // Initialize the partial aggregation result
-                                visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
-                                visitingKeyTuple = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
-                                for (int i = 0; i < keyFields.length; i++) {
-                                    visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
-                                }
-                            } else {
-                                if (compareTupleWithFrame(visitingKeyTuple, fta, tupleIndex, storedKeys, keyFields,
-                                        comparators) == 0) {
-                                    // If the two partial results are on the
-                                    // same key
-                                    visitingAggregator.accumulatePartialResult(fta, tupleIndex, keyFields);
-                                } else {
-                                    // Otherwise, write the partial result back
-                                    // to the output frame
-                                    if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
-                                        FrameUtils.flushFrame(outFrame, writer);
-                                        outFrameAppender.reset(outFrame, true);
-                                        if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
-                                            throw new IllegalStateException();
-                                        }
-                                    }
-                                    // Reset the partial aggregation result
-                                    visitingAggregator.initFromPartial(fta, tupleIndex, keyFields);
-                                    visitingKeyTuple.reset();
-                                    for (int i = 0; i < keyFields.length; i++) {
-                                        visitingKeyTuple.addField(fta, tupleIndex, keyFields[i]);
-                                    }
-                                }
-                            }
-                            tupleIndexes[runIndex]++;
-                            setNextTopTuple(runIndex, tupleIndexes, runFileReaders, tupleAccessors, topTuples);
-                        }
-                        // Output the last aggregation result in the frame
-                        if (visitingAggregator != null) {
-                            if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
-                                FrameUtils.flushFrame(outFrame, writer);
-                                outFrameAppender.reset(outFrame, true);
-                                if (!visitingAggregator.output(outFrameAppender, visitingKeyTuple)) {
-                                    throw new IllegalStateException();
-                                }
-                            }
-                        }
-                        // Output data into run file writer after all tuples
-                        // have been checked
-                        if (outFrameAppender.getTupleCount() > 0) {
-                            FrameUtils.flushFrame(outFrame, writer);
-                            outFrameAppender.reset(outFrame, true);
-                        }
-                        // empty the input frames
-                        runs.subList(0, inFrames.size()).clear();
-                        // insert the new run file into the beginning of the run
-                        // file list
-                        if (!finalPass) {
-                            runs.add(0, ((RunFileWriter) writer).createReader());
-                        }
-                    } catch (Exception ex) {
-                        throw new HyracksDataException(ex);
-                    } finally {
-                        if (!finalPass) {
-                            writer.close();
-                        }
-                    }
-                }
-
-                /**
-                 * Insert the tuple into the priority queue.
-                 * 
-                 * @param runIndex
-                 * @param tupleIndexes
-                 * @param runCursors
-                 * @param tupleAccessors
-                 * @param topTuples
-                 * @throws IOException
-                 */
-                private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
-                    boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-                    if (exists) {
-                        topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
-                    } else {
-                        topTuples.pop();
-                        closeRun(runIndex, runCursors, tupleAccessors);
-                    }
-                }
-
-                /**
-                 * Check whether there are any more tuples to be checked for the
-                 * given run file from the corresponding input frame.
-                 * If the input frame for this run file is exhausted, load a new
-                 * frame of the run file into the input frame.
-                 * 
-                 * @param runIndex
-                 * @param tupleIndexes
-                 * @param runCursors
-                 * @param tupleAccessors
-                 * @return
-                 * @throws IOException
-                 */
-                private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors) throws IOException {
-
-                    if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
-                        /*
-                         * Return false if the targeting run file is not
-                         * available, or the frame for the run file is not
-                         * available.
-                         */
-                        return false;
-                    } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
-                        /*
-                         * If all tuples in the targeting frame have been
-                         * checked.
-                         */
-                        ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
-                        // Refill the buffer with contents from the run file.
-                        if (runCursors[runIndex].nextFrame(buf)) {
-                            tupleIndexes[runIndex] = 0;
-                            return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-                        } else {
-                            return false;
-                        }
-                    } else {
-                        return true;
-                    }
-                }
-
-                /**
-                 * Close the run file, and also the corresponding readers and
-                 * input frame.
-                 * 
-                 * @param index
-                 * @param runCursors
-                 * @param tupleAccessor
-                 * @throws HyracksDataException
-                 */
-                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
-                throws HyracksDataException {
-                    runCursors[index].close();
-                    runCursors[index] = null;
-                    tupleAccessor[index] = null;
-                }
-
-                /**
-                 * Compare a tuple (in the format of a {@link ArrayTupleBuilder} ) with a record in a frame (in the format of a {@link FrameTupleAccessor}). Comparing keys and comparators
-                 * are specified for this method as inputs.
-                 * 
-                 * @param tuple0
-                 * @param accessor1
-                 * @param tIndex1
-                 * @param keys0
-                 * @param keys1
-                 * @param comparators
-                 * @return
-                 */
-                private int compareTupleWithFrame(ArrayTupleBuilder tuple0, FrameTupleAccessor accessor1, int tIndex1,
-                        int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
-                    int tStart1 = accessor1.getTupleStartOffset(tIndex1);
-                    int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
-                    for (int i = 0; i < keys0.length; ++i) {
-                        int fIdx0 = keys0[i];
-                        int fStart0 = (i == 0 ? 0 : tuple0.getFieldEndOffsets()[fIdx0 - 1]);
-                        int fEnd0 = tuple0.getFieldEndOffsets()[fIdx0];
-                        int fLen0 = fEnd0 - fStart0;
-
-                        int fIdx1 = keys1[i];
-                        int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
-                        int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
-                        int fLen1 = fEnd1 - fStart1;
-
-                        int c = comparators[i].compare(tuple0.getByteArray(), fStart0, fLen0, accessor1.getBuffer()
-                                .array(), fStart1 + fStartOffset1, fLen1);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
-                }
-            };
-            return op;
-        }
-
-        @Override
-        public IOperatorDescriptor getOwner() {
-            return ExternalHashGroupOperatorDescriptor.this;
-        }
-
-        private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
-            return new Comparator<ReferenceEntry>() {
-                public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
-                    FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
-                    FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
-                    int j1 = (Integer) tp1.getTupleIndex();
-                    int j2 = (Integer) tp2.getTupleIndex();
-                    byte[] b1 = fta1.getBuffer().array();
-                    byte[] b2 = fta2.getBuffer().array();
-                    for (int f = 0; f < keyFields.length; ++f) {
-                        int fIdx = keyFields[f];
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                        + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                        + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
-                }
-            };
-        }
-
-    }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
new file mode 100644
index 0000000..a8b61fc
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableGroupingTableFactory.java
@@ -0,0 +1,448 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HashSpillableGroupingTableFactory implements ISpillableTableFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final ITuplePartitionComputerFactory tpcf;
+    private final int tableSize;
+
+    public HashSpillableGroupingTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
+        this.tpcf = tpcf;
+        this.tableSize = tableSize;
+    }
+
+    @Override
+    public ISpillableTable buildSpillableTable(final IHyracksStageletContext ctx, final int[] keyFields,
+            final IBinaryComparatorFactory[] comparatorFactories,
+            final INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            final IAggregatorDescriptorFactory aggregateDescriptorFactory, final RecordDescriptor inRecordDescriptor,
+            final RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
+        final int[] storedKeys = new int[keyFields.length];
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
+        for (int i = 0; i < keyFields.length; i++) {
+            storedKeys[i] = i;
+            storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
+        }
+
+        RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
+        final FrameTupleAccessor storedKeysAccessor1;
+        final FrameTupleAccessor storedKeysAccessor2;
+        if (keyFields.length >= outRecordDescriptor.getFields().length) {
+            // for the case of zero-aggregations
+            ISerializerDeserializer<?>[] fields = outRecordDescriptor.getFields();
+            ITypeTrait[] types = outRecordDescriptor.getTypeTraits();
+            ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+            for (int i = 0; i < fields.length; i++)
+                newFields[i] = fields[i];
+            ITypeTrait[] newTypes = null;
+            if (types != null) {
+                newTypes = new ITypeTrait[types.length + 1];
+                for (int i = 0; i < types.length; i++)
+                    newTypes[i] = types[i];
+            }
+            internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+        }
+        storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+        storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+
+        final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);
+        final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);
+        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        final ITuplePartitionComputer tpc = tpcf.createPartitioner();
+        final ByteBuffer outFrame = ctx.allocateFrame();
+
+        final ArrayTupleBuilder internalTupleBuilder;
+        if (keyFields.length < outRecordDescriptor.getFields().length)
+            internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        else
+            internalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
+        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
+                .createNormalizedKeyComputer();
+
+        return new ISpillableTable() {
+            private int dataFrameCount;
+            private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);;
+            private final TuplePointer storedTuplePointer = new TuplePointer();
+            private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+            private int groupSize = 0;
+            private IAggregatorDescriptor aggregator = aggregateDescriptorFactory.createAggregator(ctx,
+                    inRecordDescriptor, outRecordDescriptor, keyFields);
+
+            /**
+             * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
+             * = Frame index in the "Frames" list, [1] = Tuple index in the
+             * frame, [2] = Poor man's normalized key for the tuple.
+             */
+            private int[] tPointers;
+
+            @Override
+            public void reset() {
+                groupSize = 0;
+                dataFrameCount = -1;
+                tPointers = null;
+                table.reset();
+                aggregator.close();
+            }
+
+            @Override
+            public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                if (dataFrameCount < 0)
+                    nextAvailableFrame();
+                int entry = tpc.partition(accessor, tIndex, tableSize);
+                boolean foundGroup = false;
+                int offset = 0;
+                do {
+                    table.getTuplePointer(entry, offset++, storedTuplePointer);
+                    if (storedTuplePointer.frameIndex < 0)
+                        break;
+                    storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
+                    int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
+                    if (c == 0) {
+                        foundGroup = true;
+                        break;
+                    }
+                } while (true);
+
+                if (!foundGroup) {
+                    // If no matching group is found, create a new aggregator
+                    // Create a tuple for the new group
+                    internalTupleBuilder.reset();
+                    for (int i = 0; i < keyFields.length; i++) {
+                        internalTupleBuilder.addField(accessor, tIndex, keyFields[i]);
+                    }
+                    aggregator.init(accessor, tIndex, internalTupleBuilder);
+                    if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
+                            internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                        if (!nextAvailableFrame()) {
+                            return false;
+                        } else {
+                            if (!appender.append(internalTupleBuilder.getFieldEndOffsets(),
+                                    internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+                                throw new IllegalStateException("Failed to init an aggregator");
+                            }
+                        }
+                    }
+
+                    storedTuplePointer.frameIndex = dataFrameCount;
+                    storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
+                    table.insert(entry, storedTuplePointer);
+                    groupSize++;
+                } else {
+                    // If there is a matching found, do aggregation directly
+                    int tupleOffset = storedKeysAccessor1.getTupleStartOffset(storedTuplePointer.tupleIndex);
+                    int aggFieldOffset = storedKeysAccessor1.getFieldStartOffset(storedTuplePointer.tupleIndex,
+                            keyFields.length);
+                    int tupleLength = storedKeysAccessor1.getFieldLength(storedTuplePointer.tupleIndex,
+                            keyFields.length);
+                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1.getBuffer().array(), tupleOffset
+                            + storedKeysAccessor1.getFieldSlotsLength() + aggFieldOffset, tupleLength);
+                }
+                return true;
+            }
+
+            @Override
+            public List<ByteBuffer> getFrames() {
+                return frames;
+            }
+
+            @Override
+            public int getFrameCount() {
+                return dataFrameCount;
+            }
+
+            @Override
+            public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
+                FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+                writer.open();
+                appender.reset(outFrame, true);
+                if (tPointers == null) {
+                    // Not sorted
+                    for (int i = 0; i < tableSize; ++i) {
+                        int entry = i;
+                        int offset = 0;
+                        do {
+                            table.getTuplePointer(entry, offset++, storedTuplePointer);
+                            if (storedTuplePointer.frameIndex < 0)
+                                break;
+                            int bIndex = storedTuplePointer.frameIndex;
+                            int tIndex = storedTuplePointer.tupleIndex;
+                            storedKeysAccessor1.reset(frames.get(bIndex));
+                            // Reset the tuple for the partial result
+                            outputTupleBuilder.reset();
+                            for (int k = 0; k < keyFields.length; k++) {
+                                outputTupleBuilder.addField(storedKeysAccessor1, tIndex, k);
+                            }
+                            if (isPartial)
+                                aggregator.outputPartialResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
+                            else
+                                aggregator.outputResult(storedKeysAccessor1, tIndex, outputTupleBuilder);
+                            while (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
+                                    outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                                FrameUtils.flushFrame(outFrame, writer);
+                                appender.reset(outFrame, true);
+                            }
+                        } while (true);
+                    }
+                    if (appender.getTupleCount() != 0) {
+                        FrameUtils.flushFrame(outFrame, writer);
+                    }
+                    aggregator.close();
+                    return;
+                }
+                int n = tPointers.length / 3;
+                for (int ptr = 0; ptr < n; ptr++) {
+                    int tableIndex = tPointers[ptr * 3];
+                    int rowIndex = tPointers[ptr * 3 + 1];
+                    table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
+                    int frameIndex = storedTuplePointer.frameIndex;
+                    int tupleIndex = storedTuplePointer.tupleIndex;
+                    // Get the frame containing the value
+                    ByteBuffer buffer = frames.get(frameIndex);
+                    storedKeysAccessor1.reset(buffer);
+
+                    outputTupleBuilder.reset();
+                    for (int k = 0; k < keyFields.length; k++) {
+                        outputTupleBuilder.addField(storedKeysAccessor1, tupleIndex, k);
+                    }
+                    if (isPartial)
+                        aggregator.outputPartialResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
+                    else
+                        aggregator.outputResult(storedKeysAccessor1, tupleIndex, outputTupleBuilder);
+                    if (!appender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
+                            outputTupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outFrame, writer);
+                        appender.reset(outFrame, true);
+                        if (!appender.append(outputTupleBuilder.getFieldEndOffsets(),
+                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+                if (appender.getTupleCount() > 0) {
+                    FrameUtils.flushFrame(outFrame, writer);
+                }
+                aggregator.close();
+            }
+
+            /**
+             * Set the working frame to the next available frame in the frame
+             * list. There are two cases:<br>
+             * 1) If the next frame is not initialized, allocate a new frame. 2)
+             * When frames are already created, they are recycled.
+             * 
+             * @return Whether a new frame is added successfully.
+             */
+            private boolean nextAvailableFrame() {
+                // Return false if the number of frames is equal to the limit.
+                if (dataFrameCount + 1 >= framesLimit)
+                    return false;
+
+                if (frames.size() < framesLimit) {
+                    // Insert a new frame
+                    ByteBuffer frame = ctx.allocateFrame();
+                    frame.position(0);
+                    frame.limit(frame.capacity());
+                    frames.add(frame);
+                    appender.reset(frame, true);
+                    dataFrameCount = frames.size() - 1;
+                } else {
+                    // Reuse an old frame
+                    dataFrameCount++;
+                    ByteBuffer frame = frames.get(dataFrameCount);
+                    frame.position(0);
+                    frame.limit(frame.capacity());
+                    appender.reset(frame, true);
+                }
+                return true;
+            }
+
+            @Override
+            public void sortFrames() {
+                int sfIdx = storedKeys[0];
+                int totalTCount = table.getTupleCount();
+                tPointers = new int[totalTCount * 3];
+                int ptr = 0;
+
+                for (int i = 0; i < tableSize; i++) {
+                    int entry = i;
+                    int offset = 0;
+                    do {
+                        table.getTuplePointer(entry, offset, storedTuplePointer);
+                        if (storedTuplePointer.frameIndex < 0)
+                            break;
+                        tPointers[ptr * 3] = entry;
+                        tPointers[ptr * 3 + 1] = offset;
+                        table.getTuplePointer(entry, offset, storedTuplePointer);
+                        int fIndex = storedTuplePointer.frameIndex;
+                        int tIndex = storedTuplePointer.tupleIndex;
+                        storedKeysAccessor1.reset(frames.get(fIndex));
+                        int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
+                        int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
+                        int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
+                        int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
+                        tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
+                                .array(), f0Start, f0EndRel - f0StartRel);
+                        ptr++;
+                        offset++;
+                    } while (true);
+                }
+                // Sort using quick sort
+                if (tPointers.length > 0) {
+                    sort(tPointers, 0, totalTCount);
+                }
+            }
+
+            private void sort(int[] tPointers, int offset, int length) {
+                int m = offset + (length >> 1);
+                // Get table index
+                int mTable = tPointers[m * 3];
+                int mRow = tPointers[m * 3 + 1];
+                int mNormKey = tPointers[m * 3 + 2];
+                // Get frame and tuple index
+                table.getTuplePointer(mTable, mRow, storedTuplePointer);
+                int mFrame = storedTuplePointer.frameIndex;
+                int mTuple = storedTuplePointer.tupleIndex;
+                storedKeysAccessor1.reset(frames.get(mFrame));
+
+                int a = offset;
+                int b = a;
+                int c = offset + length - 1;
+                int d = c;
+                while (true) {
+                    while (b <= c) {
+                        int bTable = tPointers[b * 3];
+                        int bRow = tPointers[b * 3 + 1];
+                        int bNormKey = tPointers[b * 3 + 2];
+                        int cmp = 0;
+                        if (bNormKey != mNormKey) {
+                            cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+                        } else {
+                            table.getTuplePointer(bTable, bRow, storedTuplePointer);
+                            int bFrame = storedTuplePointer.frameIndex;
+                            int bTuple = storedTuplePointer.tupleIndex;
+                            storedKeysAccessor2.reset(frames.get(bFrame));
+                            cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
+                        }
+                        if (cmp > 0) {
+                            break;
+                        }
+                        if (cmp == 0) {
+                            swap(tPointers, a++, b);
+                        }
+                        ++b;
+                    }
+                    while (c >= b) {
+                        int cTable = tPointers[c * 3];
+                        int cRow = tPointers[c * 3 + 1];
+                        int cNormKey = tPointers[c * 3 + 2];
+                        int cmp = 0;
+                        if (cNormKey != mNormKey) {
+                            cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+                        } else {
+                            table.getTuplePointer(cTable, cRow, storedTuplePointer);
+                            int cFrame = storedTuplePointer.frameIndex;
+                            int cTuple = storedTuplePointer.tupleIndex;
+                            storedKeysAccessor2.reset(frames.get(cFrame));
+                            cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
+                        }
+                        if (cmp < 0) {
+                            break;
+                        }
+                        if (cmp == 0) {
+                            swap(tPointers, c, d--);
+                        }
+                        --c;
+                    }
+                    if (b > c)
+                        break;
+                    swap(tPointers, b++, c--);
+                }
+
+                int s;
+                int n = offset + length;
+                s = Math.min(a - offset, b - a);
+                vecswap(tPointers, offset, b - s, s);
+                s = Math.min(d - c, n - d - 1);
+                vecswap(tPointers, b, n - s, s);
+
+                if ((s = b - a) > 1) {
+                    sort(tPointers, offset, s);
+                }
+                if ((s = d - c) > 1) {
+                    sort(tPointers, n - s, s);
+                }
+            }
+
+            private void swap(int x[], int a, int b) {
+                for (int i = 0; i < 3; ++i) {
+                    int t = x[a * 3 + i];
+                    x[a * 3 + i] = x[b * 3 + i];
+                    x[b * 3 + i] = t;
+                }
+            }
+
+            private void vecswap(int x[], int a, int b, int n) {
+                for (int i = 0; i < n; i++, a++, b++) {
+                    swap(x, a, b);
+                }
+            }
+
+            @Override
+            public void close() {
+                groupSize = 0;
+                dataFrameCount = -1;
+                tPointers = null;
+                table.close();
+                frames.clear();
+            }
+        };
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
index 3fc7d79..8eff82f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregatorFactory.java
@@ -24,6 +24,4 @@
     IAccumulatingAggregator createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException;
 
-    ISpillableAccumulatingAggregator createSpillableAggregator(IHyracksStageletContext ctx,
-            RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor) throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
deleted file mode 100644
index 59c69eb..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableAccumulatingAggregator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * An spillable version of the {@link IAccumulatingAggregator} supporting
- * external aggregation.
- */
-public interface ISpillableAccumulatingAggregator extends IAccumulatingAggregator {
-
-    public void initFromPartial(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
-    throws HyracksDataException;
-
-    /**
-     * @param accessor
-     * @param tIndex
-     * @throws HyracksDataException
-     */
-    public void accumulatePartialResult(IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
-    throws HyracksDataException;
-
-    public boolean output(FrameTupleAppender appender, ArrayTupleBuilder tbder) throws HyracksDataException;
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
new file mode 100644
index 0000000..7bd8cd8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTable.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * @author jarodwen
+ */
+public interface ISpillableTable {
+
+    public void close();
+
+    public void reset();
+
+    public int getFrameCount();
+
+    public List<ByteBuffer> getFrames();
+
+    public void sortFrames();
+
+    public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+    public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException;
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
new file mode 100644
index 0000000..07f0ac0
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+
+/**
+ * @author jarodwen
+ */
+public interface ISpillableTableFactory extends Serializable {
+    ISpillableTable buildSpillableTable(IHyracksStageletContext ctx, int[] keyFields,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
deleted file mode 100644
index b3b9f24..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SpillableGroupingHashTable.java
+++ /dev/null
@@ -1,569 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-/**
- * An in-mem hash table for spillable grouping operations.
- * A table of {@link #Link}s are maintained in this object, and each row
- * of this table represents a hash partition.
- */
-public class SpillableGroupingHashTable {
-
-    /**
-     * Context.
-     */
-    private final IHyracksStageletContext ctx;
-
-    /**
-     * Columns for group-by
-     */
-    private final int[] fields;
-
-    /**
-     * Key fields of records in the hash table (starting from 0
-     * to the number of the key fields).
-     * This is different from the key fields in the input records,
-     * since these fields are extracted when being inserted into
-     * the hash table.
-     */
-    private final int[] storedKeys;
-
-    /**
-     * Comparators: one for each column in {@link #groupFields}
-     */
-    private final IBinaryComparator[] comparators;
-
-    /**
-     * Record descriptor for the input tuple.
-     */
-    private final RecordDescriptor inRecordDescriptor;
-
-    /**
-     * Record descriptor for the partial aggregation result.
-     */
-    private final RecordDescriptor outputRecordDescriptor;
-
-    /**
-     * Accumulators in the main memory.
-     */
-    private ISpillableAccumulatingAggregator[] accumulators;
-
-    /**
-     * The hashing group table containing pointers to aggregators and also the
-     * corresponding key tuples. So for each entry, there will be three integer
-     * fields:
-     * 1. The frame index containing the key tuple; 2. The tuple index inside of
-     * the frame for the key tuple; 3. The index of the aggregator.
-     * Note that each link in the table is a partition for the input records. Multiple
-     * records in the same partition based on the {@link #tpc} are stored as
-     * pointers.
-     */
-    private final Link[] table;
-
-    /**
-     * Number of accumulators.
-     */
-    private int accumulatorSize = 0;
-
-    /**
-     * Factory for the aggregators.
-     */
-    private final IAccumulatingAggregatorFactory aggregatorFactory;
-
-    private final List<ByteBuffer> frames;
-
-    private final ByteBuffer outFrame;
-
-    /**
-     * Frame appender for output frames in {@link #frames}.
-     */
-    private final FrameTupleAppender appender;
-
-    /**
-     * The count of used frames in the table.
-     * Note that this cannot be replaced by {@link #frames} since frames will
-     * not be removed after being created.
-     */
-    private int dataFrameCount;
-
-    /**
-     * Pointers for the sorted aggregators
-     */
-    private int[] tPointers;
-
-    private static final int INIT_ACCUMULATORS_SIZE = 8;
-
-    /**
-     * The maximum number of frames available for this hashing group table.
-     */
-    private final int framesLimit;
-
-    private final FrameTuplePairComparator ftpc;
-
-    /**
-     * A partition computer to partition the hashing group table.
-     */
-    private final ITuplePartitionComputer tpc;
-
-    /**
-     * Accessors for the tuples. Two accessors are necessary during the sort.
-     */
-    private final FrameTupleAccessor storedKeysAccessor1;
-    private final FrameTupleAccessor storedKeysAccessor2;
-
-    /**
-     * Create a spillable grouping hash table.
-     * 
-     * @param ctx
-     *            The context of the job.
-     * @param fields
-     *            Fields of keys for grouping.
-     * @param comparatorFactories
-     *            The comparators.
-     * @param tpcf
-     *            The partitioners. These are used to partition the incoming records into proper partition of the hash table.
-     * @param aggregatorFactory
-     *            The aggregators.
-     * @param inRecordDescriptor
-     *            Record descriptor for input data.
-     * @param outputRecordDescriptor
-     *            Record descriptor for output data.
-     * @param framesLimit
-     *            The maximum number of frames usable in the memory for hash table.
-     * @param tableSize
-     *            The size of the table, which specified the number of partitions of the table.
-     */
-    public SpillableGroupingHashTable(IHyracksStageletContext ctx, int[] fields,
-            IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory tpcf,
-            IAccumulatingAggregatorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outputRecordDescriptor, int framesLimit, int tableSize) {
-        this.ctx = ctx;
-        this.fields = fields;
-
-        storedKeys = new int[fields.length];
-        @SuppressWarnings("rawtypes")
-        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-
-        // Note that after storing a record into the hash table, the index for the fields should
-        // be updated. Here we assume that all these key fields are written at the beginning of 
-        // the record, so their index should start from 0 and end at the length of the key fields.
-        for (int i = 0; i < fields.length; ++i) {
-            storedKeys[i] = i;
-            storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
-        }
-        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
-        storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
-        storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
-
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-
-        this.table = new Link[tableSize];
-
-        this.aggregatorFactory = aggregatorFactory;
-        accumulators = new ISpillableAccumulatingAggregator[INIT_ACCUMULATORS_SIZE];
-
-        this.framesLimit = framesLimit;
-
-        // Tuple pair comparator
-        ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-
-        // Partitioner
-        tpc = tpcf.createPartitioner();
-
-        this.inRecordDescriptor = inRecordDescriptor;
-        this.outputRecordDescriptor = outputRecordDescriptor;
-        frames = new ArrayList<ByteBuffer>();
-        appender = new FrameTupleAppender(ctx.getFrameSize());
-
-        dataFrameCount = -1;
-
-        outFrame = ctx.allocateFrame();
-    }
-
-    public void reset() {
-        dataFrameCount = -1;
-        tPointers = null;
-        // Reset the grouping hash table
-        for (int i = 0; i < table.length; i++) {
-            table[i] = new Link();
-        }
-    }
-
-    public int getFrameCount() {
-        return dataFrameCount;
-    }
-
-    /**
-     * How to define pointers for the partial aggregation
-     * 
-     * @return
-     */
-    public int[] getTPointers() {
-        return tPointers;
-    }
-
-    /**
-     * Redefine the number of fields in the pointer.
-     * Only two pointers are necessary for external grouping: one is to the
-     * index of the hash table, and the other is to the row index inside of the
-     * hash table.
-     * 
-     * @return
-     */
-    public int getPtrFields() {
-        return 2;
-    }
-
-    public List<ByteBuffer> getFrames() {
-        return frames;
-    }
-
-    /**
-     * Set the working frame to the next available frame in the
-     * frame list. There are two cases:<br>
-     * 1) If the next frame is not initialized, allocate
-     * a new frame.
-     * 2) When frames are already created, they are recycled.
-     * 
-     * @return Whether a new frame is added successfully.
-     */
-    private boolean nextAvailableFrame() {
-        // Return false if the number of frames is equal to the limit.
-        if (dataFrameCount + 1 >= framesLimit)
-            return false;
-
-        if (frames.size() < framesLimit) {
-            // Insert a new frame
-            ByteBuffer frame = ctx.allocateFrame();
-            frame.position(0);
-            frame.limit(frame.capacity());
-            frames.add(frame);
-            appender.reset(frame, true);
-            dataFrameCount++;
-        } else {
-            // Reuse an old frame
-            dataFrameCount++;
-            ByteBuffer frame = frames.get(dataFrameCount);
-            frame.position(0);
-            frame.limit(frame.capacity());
-            appender.reset(frame, true);
-        }
-        return true;
-    }
-
-    /**
-     * Insert a new record from the input frame.
-     * 
-     * @param accessor
-     * @param tIndex
-     * @return
-     * @throws HyracksDataException
-     */
-    public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-        if (dataFrameCount < 0)
-            nextAvailableFrame();
-        // Get the partition for the inserting tuple
-        int entry = tpc.partition(accessor, tIndex, table.length);
-        Link link = table[entry];
-        if (link == null) {
-            link = table[entry] = new Link();
-        }
-        // Find the corresponding aggregator from existing aggregators
-        ISpillableAccumulatingAggregator aggregator = null;
-        for (int i = 0; i < link.size; i += 3) {
-            int sbIndex = link.pointers[i];
-            int stIndex = link.pointers[i + 1];
-            int saIndex = link.pointers[i + 2];
-            storedKeysAccessor1.reset(frames.get(sbIndex));
-            int c = ftpc.compare(accessor, tIndex, storedKeysAccessor1, stIndex);
-            if (c == 0) {
-                aggregator = accumulators[saIndex];
-                break;
-            }
-        }
-        // Do insert
-        if (aggregator == null) {
-            // Did not find the aggregator. Insert a new aggregator entry
-            if (!appender.appendProjection(accessor, tIndex, fields)) {
-                if (!nextAvailableFrame()) {
-                    // If buffer is full, return false to trigger a run file
-                    // write
-                    return false;
-                } else {
-                    // Try to do insert after adding a new frame.
-                    if (!appender.appendProjection(accessor, tIndex, fields)) {
-                        throw new IllegalStateException();
-                    }
-                }
-            }
-            int sbIndex = dataFrameCount;
-            int stIndex = appender.getTupleCount() - 1;
-            if (accumulatorSize >= accumulators.length) {
-                accumulators = Arrays.copyOf(accumulators, accumulators.length * 2);
-            }
-            int saIndex = accumulatorSize++;
-            aggregator = accumulators[saIndex] = aggregatorFactory.createSpillableAggregator(ctx, inRecordDescriptor,
-                    outputRecordDescriptor);
-            aggregator.init(accessor, tIndex);
-            link.add(sbIndex, stIndex, saIndex);
-        }
-        aggregator.accumulate(accessor, tIndex);
-        return true;
-    }
-
-    /**
-     * Sort partial results
-     */
-    public void sortFrames() {
-        int totalTCount = 0;
-        // Get the number of records
-        for (int i = 0; i < table.length; i++) {
-            if (table[i] == null)
-                continue;
-            totalTCount += table[i].size / 3;
-        }
-        // Start sorting:
-        /*
-         * Based on the data structure for the partial aggregates, the
-         * pointers should be initialized.
-         */
-        tPointers = new int[totalTCount * getPtrFields()];
-        // Initialize pointers
-        int ptr = 0;
-        // Maintain two pointers to each entry of the hashing group table
-        for (int i = 0; i < table.length; i++) {
-            if (table[i] == null)
-                continue;
-            for (int j = 0; j < table[i].size; j = j + 3) {
-                tPointers[ptr * getPtrFields()] = i;
-                tPointers[ptr * getPtrFields() + 1] = j;
-                ptr++;
-            }
-        }
-        // Sort using quick sort
-        if (tPointers.length > 0) {
-            sort(tPointers, 0, totalTCount);
-        }
-    }
-
-    /**
-     * @param writer
-     * @throws HyracksDataException
-     */
-    public void flushFrames(IFrameWriter writer, boolean sorted) throws HyracksDataException {
-        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-
-        ISpillableAccumulatingAggregator aggregator = null;
-        writer.open();
-        appender.reset(outFrame, true);
-        if (sorted) {
-            sortFrames();
-        }
-        if (tPointers == null) {
-            // Not sorted
-            for (int i = 0; i < table.length; ++i) {
-                Link link = table[i];
-                if (link != null) {
-                    for (int j = 0; j < link.size; j += 3) {
-                        int bIndex = link.pointers[j];
-                        int tIndex = link.pointers[j + 1];
-                        int aIndex = link.pointers[j + 2];
-                        ByteBuffer keyBuffer = frames.get(bIndex);
-                        storedKeysAccessor1.reset(keyBuffer);
-                        aggregator = accumulators[aIndex];
-                        while (!aggregator.output(appender, storedKeysAccessor1, tIndex, storedKeys)) {
-                            FrameUtils.flushFrame(outFrame, writer);
-                            appender.reset(outFrame, true);
-                        }
-                    }
-                }
-            }
-            if (appender.getTupleCount() != 0) {
-                FrameUtils.flushFrame(outFrame, writer);
-            }
-            return;
-        }
-        int n = tPointers.length / getPtrFields();
-        for (int ptr = 0; ptr < n; ptr++) {
-            int tableIndex = tPointers[ptr * 2];
-            int rowIndex = tPointers[ptr * 2 + 1];
-            int frameIndex = table[tableIndex].pointers[rowIndex];
-            int tupleIndex = table[tableIndex].pointers[rowIndex + 1];
-            int aggregatorIndex = table[tableIndex].pointers[rowIndex + 2];
-            // Get the frame containing the value
-            ByteBuffer buffer = frames.get(frameIndex);
-            storedKeysAccessor1.reset(buffer);
-
-            // Get the aggregator
-            aggregator = accumulators[aggregatorIndex];
-            // Insert
-            if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
-                FrameUtils.flushFrame(outFrame, writer);
-                appender.reset(outFrame, true);
-                if (!aggregator.output(appender, storedKeysAccessor1, tupleIndex, fields)) {
-                    throw new IllegalStateException();
-                } else {
-                    accumulators[aggregatorIndex] = null;
-                }
-            } else {
-                accumulators[aggregatorIndex] = null;
-            }
-        }
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(outFrame, writer);
-        }
-    }
-
-    private void sort(int[] tPointers, int offset, int length) {
-        int m = offset + (length >> 1);
-        // Get table index
-        int mTable = tPointers[m * 2];
-        int mRow = tPointers[m * 2 + 1];
-        // Get frame and tuple index
-        int mFrame = table[mTable].pointers[mRow];
-        int mTuple = table[mTable].pointers[mRow + 1];
-        storedKeysAccessor1.reset(frames.get(mFrame));
-
-        int a = offset;
-        int b = a;
-        int c = offset + length - 1;
-        int d = c;
-        while (true) {
-            while (b <= c) {
-                int bTable = tPointers[b * 2];
-                int bRow = tPointers[b * 2 + 1];
-                int bFrame = table[bTable].pointers[bRow];
-                int bTuple = table[bTable].pointers[bRow + 1];
-                storedKeysAccessor2.reset(frames.get(bFrame));
-                int cmp = ftpc.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
-                // int cmp = compare(tPointers, b, mi, mj, mv);
-                if (cmp > 0) {
-                    break;
-                }
-                if (cmp == 0) {
-                    swap(tPointers, a++, b);
-                }
-                ++b;
-            }
-            while (c >= b) {
-                int cTable = tPointers[c * 2];
-                int cRow = tPointers[c * 2 + 1];
-                int cFrame = table[cTable].pointers[cRow];
-                int cTuple = table[cTable].pointers[cRow + 1];
-                storedKeysAccessor2.reset(frames.get(cFrame));
-                int cmp = ftpc.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
-                // int cmp = compare(tPointers, c, mi, mj, mv);
-                if (cmp < 0) {
-                    break;
-                }
-                if (cmp == 0) {
-                    swap(tPointers, c, d--);
-                }
-                --c;
-            }
-            if (b > c)
-                break;
-            swap(tPointers, b++, c--);
-        }
-
-        int s;
-        int n = offset + length;
-        s = Math.min(a - offset, b - a);
-        vecswap(tPointers, offset, b - s, s);
-        s = Math.min(d - c, n - d - 1);
-        vecswap(tPointers, b, n - s, s);
-
-        if ((s = b - a) > 1) {
-            sort(tPointers, offset, s);
-        }
-        if ((s = d - c) > 1) {
-            sort(tPointers, n - s, s);
-        }
-    }
-
-    private void swap(int x[], int a, int b) {
-        for (int i = 0; i < 2; ++i) {
-            int t = x[a * 2 + i];
-            x[a * 2 + i] = x[b * 2 + i];
-            x[b * 2 + i] = t;
-        }
-    }
-
-    private void vecswap(int x[], int a, int b, int n) {
-        for (int i = 0; i < n; i++, a++, b++) {
-            swap(x, a, b);
-        }
-    }
-
-    /**
-     * The pointers in the link store 3 int values for each entry in the
-     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
-     * 
-     * @author vinayakb
-     */
-    private static class Link {
-        private static final int INIT_POINTERS_SIZE = 9;
-
-        int[] pointers;
-        int size;
-
-        Link() {
-            pointers = new int[INIT_POINTERS_SIZE];
-            size = 0;
-        }
-
-        void add(int bufferIdx, int tIndex, int accumulatorIdx) {
-            while (size + 3 > pointers.length) {
-                pointers = Arrays.copyOf(pointers, pointers.length * 2);
-            }
-            pointers[size++] = bufferIdx;
-            pointers[size++] = tIndex;
-            pointers[size++] = accumulatorIdx;
-        }
-
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("[Size=" + size + "]");
-            for (int i = 0; i < pointers.length; i = i + 3) {
-                sb.append(pointers[i] + ",");
-                sb.append(pointers[i + 1] + ",");
-                sb.append(pointers[i + 2] + "; ");
-            }
-            return sb.toString();
-        }
-    }
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 94a9501..d1d528c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -42,7 +42,7 @@
     private final ByteBuffer outBuffer;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuild;
-    
+
     public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
@@ -57,7 +57,7 @@
         tpComparator = comparator;
         outBuffer = ctx.allocateFrame();
         appender.reset(outBuffer, true);
-        this.isLeftOuter = isLeftOuter;        
+        this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
             int fieldCountOuter = accessor1.getFieldCount();
             nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
@@ -114,13 +114,14 @@
                 }
             }
             if (!matchFound && isLeftOuter) {
-                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+                        nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
                     flushFrame(outBuffer, writer);
                     appender.reset(outBuffer, true);
-                    if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
-                            .getSize())) {
+                    if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+                            nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
                         throw new IllegalStateException();
-                    }                  
+                    }
                 }
             }
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 7f097e0..e44e8a9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
 
 public class PrinterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -44,7 +45,7 @@
         @Override
         public void writeData(Object[] data) throws HyracksDataException {
             for (int i = 0; i < data.length; ++i) {
-                System.err.print(String.valueOf(data[i]));
+                System.err.print(StringSerializationUtils.toString(data[i]));
                 System.err.print(", ");
             }
             System.err.println();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 3742e91..710ef38 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -35,14 +35,18 @@
     private final int[] sortFields;
     private final INormalizedKeyComputer nkc;
     private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDescriptor;
     private final List<ByteBuffer> buffers;
 
     private final FrameTupleAccessor fta1;
     private final FrameTupleAccessor fta2;
 
+    private final FrameTupleAppender appender;
+
+    private final ByteBuffer outFrame;
+
     private int dataFrameCount;
     private int[] tPointers;
+    private int tupleCount;
 
     public FrameSorter(IHyracksCommonContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
@@ -54,39 +58,24 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        this.recordDescriptor = recordDescriptor;
         buffers = new ArrayList<ByteBuffer>();
         fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        outFrame = ctx.allocateFrame();
 
         dataFrameCount = 0;
     }
 
     public void reset() {
         dataFrameCount = 0;
-        tPointers = null;
+        tupleCount = 0;
     }
 
     public int getFrameCount() {
         return dataFrameCount;
     }
 
-    /**
-     * Gets the sorted tuple pointers.
-     * A tuple is "pointed" to by 4 entries in the tPointers array.
-     * [0] = Frame index in the "Frames" list.
-     * [1] = Start offset of the tuple in the frame
-     * [2] = Length of the tuple
-     * [3] = Poor man's normalized key for the tuple.
-     */
-    public int[] getTPointers() {
-        return tPointers;
-    }
-
-    public List<ByteBuffer> getFrames() {
-        return buffers;
-    }
-
     public void insertFrame(ByteBuffer buffer) {
         ByteBuffer copyFrame;
         if (dataFrameCount == buffers.size()) {
@@ -100,55 +89,50 @@
     }
 
     public void sortFrames() {
-        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         int nBuffers = dataFrameCount;
-        int totalTCount = 0;
+        tupleCount = 0;
         for (int i = 0; i < nBuffers; ++i) {
-            accessor.reset(buffers.get(i));
-            totalTCount += accessor.getTupleCount();
+            fta1.reset(buffers.get(i));
+            tupleCount += fta1.getTupleCount();
         }
         int sfIdx = sortFields[0];
-        tPointers = new int[totalTCount * 4];
+        tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
         int ptr = 0;
         for (int i = 0; i < nBuffers; ++i) {
-            accessor.reset(buffers.get(i));
-            int tCount = accessor.getTupleCount();
-            byte[] array = accessor.getBuffer().array();
+            fta1.reset(buffers.get(i));
+            int tCount = fta1.getTupleCount();
+            byte[] array = fta1.getBuffer().array();
             for (int j = 0; j < tCount; ++j) {
-                int tStart = accessor.getTupleStartOffset(j);
-                int tEnd = accessor.getTupleEndOffset(j);
+                int tStart = fta1.getTupleStartOffset(j);
+                int tEnd = fta1.getTupleEndOffset(j);
                 tPointers[ptr * 4] = i;
                 tPointers[ptr * 4 + 1] = tStart;
                 tPointers[ptr * 4 + 2] = tEnd;
-                int f0StartRel = accessor.getFieldStartOffset(j, sfIdx);
-                int f0EndRel = accessor.getFieldEndOffset(j, sfIdx);
-                int f0Start = f0StartRel + tStart + accessor.getFieldSlotsLength();
+                int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
+                int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
+                int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
                 tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
                 ++ptr;
             }
         }
-        if (tPointers.length > 0) {
-            sort(tPointers, 0, totalTCount);
+        if (tupleCount > 0) {
+            sort(tPointers, 0, tupleCount);
         }
     }
 
     public void flushFrames(IFrameWriter writer) throws HyracksDataException {
-        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-        ByteBuffer outFrame = ctx.allocateFrame();
         writer.open();
         appender.reset(outFrame, true);
-        int n = tPointers.length / 4;
-        for (int ptr = 0; ptr < n; ++ptr) {
+        for (int ptr = 0; ptr < tupleCount; ++ptr) {
             int i = tPointers[ptr * 4];
             int tStart = tPointers[ptr * 4 + 1];
             int tEnd = tPointers[ptr * 4 + 2];
             ByteBuffer buffer = buffers.get(i);
-            accessor.reset(buffer);
-            if (!appender.append(accessor, tStart, tEnd)) {
+            fta1.reset(buffer);
+            if (!appender.append(fta1, tStart, tEnd)) {
                 FrameUtils.flushFrame(outFrame, writer);
                 appender.reset(outFrame, true);
-                if (!appender.append(accessor, tStart, tEnd)) {
+                if (!appender.append(fta1, tStart, tEnd)) {
                     throw new IllegalStateException();
                 }
             }
@@ -255,4 +239,4 @@
         }
         return 0;
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/ISerializableTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/ISerializableTable.java
new file mode 100644
index 0000000..7dc0b17
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface ISerializableTable {
+
+    public void insert(int entry, TuplePointer tuplePointer);
+
+    public void getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
+
+    public int getFrameCount();
+
+    public int getTupleCount();
+
+    public void reset();
+
+    public void close();
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
new file mode 100644
index 0000000..9e8cf00
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -0,0 +1,267 @@
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+
+/**
+ * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex;
+ * fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer
+ */
+public class SerializableHashTable implements ISerializableTable {
+
+    private static final int INT_SIZE = 4;
+    private static final int INIT_ENTRY_SIZE = 4;
+
+    private IntSerDeBuffer[] headers;
+    private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>();
+    private List<Integer> frameCurrentIndex = new ArrayList<Integer>();
+    private final IHyracksStageletContext ctx;
+    private int frameCapacity = 0;
+    private int currentLargestFrameIndex = 0;
+    private int tupleCount = 0;
+    private int headerFrameCount = 0;
+    private TuplePointer tempTuplePointer = new TuplePointer();
+
+    public SerializableHashTable(int tableSize, final IHyracksStageletContext ctx) {
+        this.ctx = ctx;
+        int frameSize = ctx.getFrameSize();
+
+        int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
+        int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;
+        headers = new IntSerDeBuffer[headerSize];
+
+        IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array());
+        contents.add(frame);
+        frameCurrentIndex.add(0);
+        frameCapacity = frame.capacity();
+    }
+
+    @Override
+    public void insert(int entry, TuplePointer pointer) {
+        int hFrameIndex = getHeaderFrameIndex(entry);
+        int headerOffset = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[hFrameIndex];
+        if (header == null) {
+            header = new IntSerDeBuffer(ctx.allocateFrame().array());
+            headers[hFrameIndex] = header;
+            resetFrame(header);
+            headerFrameCount++;
+        }
+        int frameIndex = header.getInt(headerOffset);
+        int offsetIndex = header.getInt(headerOffset + 1);
+        if (frameIndex < 0) {
+            // insert first tuple into the entry
+            insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
+        } else {
+            // insert non-first tuple into the entry
+            insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer);
+        }
+        tupleCount++;
+    }
+
+    @Override
+    public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
+        int hFrameIndex = getHeaderFrameIndex(entry);
+        int headerOffset = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[hFrameIndex];
+        if (header == null) {
+            dataPointer.frameIndex = -1;
+            dataPointer.tupleIndex = -1;
+            return;
+        }
+        int frameIndex = header.getInt(headerOffset);
+        int offsetIndex = header.getInt(headerOffset + 1);
+        if (frameIndex < 0) {
+            dataPointer.frameIndex = -1;
+            dataPointer.tupleIndex = -1;
+            return;
+        }
+        IntSerDeBuffer frame = contents.get(frameIndex);
+        int entryUsedItems = frame.getInt(offsetIndex + 1);
+        if (offset > entryUsedItems - 1) {
+            dataPointer.frameIndex = -1;
+            dataPointer.tupleIndex = -1;
+            return;
+        }
+        int startIndex = offsetIndex + 2 + offset * 2;
+        while (startIndex >= frameCapacity) {
+            ++frameIndex;
+            startIndex -= frameCapacity;
+        }
+        frame = contents.get(frameIndex);
+        dataPointer.frameIndex = frame.getInt(startIndex);
+        dataPointer.tupleIndex = frame.getInt(startIndex + 1);
+    }
+
+    @Override
+    public void reset() {
+        for (IntSerDeBuffer frame : headers)
+            if (frame != null)
+                resetFrame(frame);
+
+        frameCurrentIndex.clear();
+        for (int i = 0; i < contents.size(); i++) {
+            frameCurrentIndex.add(0);
+        }
+
+        currentLargestFrameIndex = 0;
+        tupleCount = 0;
+    }
+
+    @Override
+    public int getFrameCount() {
+        return headerFrameCount + contents.size();
+    }
+
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    public void close() {
+        for (int i = 0; i < headers.length; i++)
+            headers[i] = null;
+        contents.clear();
+        frameCurrentIndex.clear();
+        tupleCount = 0;
+        currentLargestFrameIndex = 0;
+    }
+
+    private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer) {
+        IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
+        int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
+        int requiredIntCapacity = entryCapacity * 2;
+        int startFrameIndex = currentLargestFrameIndex;
+
+        if (lastIndex + requiredIntCapacity >= frameCapacity) {
+            IntSerDeBuffer newFrame;
+            startFrameIndex++;
+            do {
+                if (currentLargestFrameIndex >= contents.size() - 1) {
+                    newFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
+                    currentLargestFrameIndex++;
+                    contents.add(newFrame);
+                    frameCurrentIndex.add(0);
+                } else {
+                    currentLargestFrameIndex++;
+                    frameCurrentIndex.set(currentLargestFrameIndex, 0);
+                }
+                requiredIntCapacity -= frameCapacity;
+            } while (requiredIntCapacity > 0);
+            lastIndex = 0;
+            lastFrame = contents.get(startFrameIndex);
+        }
+
+        // set header
+        header.writeInt(headerOffset, startFrameIndex);
+        header.writeInt(headerOffset + 1, lastIndex);
+
+        // set the entry
+        lastFrame.writeInt(lastIndex, entryCapacity - 1);
+        lastFrame.writeInt(lastIndex + 1, 1);
+        lastFrame.writeInt(lastIndex + 2, pointer.frameIndex);
+        lastFrame.writeInt(lastIndex + 3, pointer.tupleIndex);
+        int newLastIndex = lastIndex + entryCapacity * 2;
+        newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1;
+        frameCurrentIndex.set(startFrameIndex, newLastIndex);
+
+        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
+        while (requiredIntCapacity > 0) {
+            startFrameIndex++;
+            requiredIntCapacity -= frameCapacity;
+            newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1;
+            frameCurrentIndex.set(startFrameIndex, newLastIndex);
+        }
+    }
+
+    private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndex, int offsetIndex,
+            TuplePointer pointer) {
+        IntSerDeBuffer frame = contents.get(frameIndex);
+        int entryItems = frame.getInt(offsetIndex);
+        int entryUsedItems = frame.getInt(offsetIndex + 1);
+
+        if (entryUsedItems < entryItems) {
+            frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
+            int startIndex = offsetIndex + 2 + entryUsedItems * 2;
+            while (startIndex >= frameCapacity) {
+                ++frameIndex;
+                startIndex -= frameCapacity;
+            }
+            frame = contents.get(frameIndex);
+            frame.writeInt(startIndex, pointer.frameIndex);
+            frame.writeInt(startIndex + 1, pointer.tupleIndex);
+        } else {
+            int capacity = (entryItems + 1) * 2;
+            header.writeInt(headerOffset, -1);
+            header.writeInt(headerOffset + 1, -1);
+            int fIndex = frame.getInt(offsetIndex + 2);
+            int tIndex = frame.getInt(offsetIndex + 3);
+            tempTuplePointer.frameIndex = fIndex;
+            tempTuplePointer.tupleIndex = tIndex;
+            this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer);
+            int newFrameIndex = header.getInt(headerOffset);
+            int newTupleIndex = header.getInt(headerOffset + 1);
+
+            for (int i = 1; i < entryUsedItems; i++) {
+                int startIndex = offsetIndex + 2 + i * 2;
+                int startFrameIndex = frameIndex;
+                while (startIndex >= frameCapacity) {
+                    ++startFrameIndex;
+                    startIndex -= frameCapacity;
+                }
+                frame = contents.get(startFrameIndex);
+                fIndex = frame.getInt(startIndex);
+                tIndex = frame.getInt(startIndex + 1);
+                tempTuplePointer.frameIndex = fIndex;
+                tempTuplePointer.tupleIndex = tIndex;
+                insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer);
+            }
+            insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer);
+        }
+    }
+
+    private void resetFrame(IntSerDeBuffer frame) {
+        for (int i = 0; i < frameCapacity; i++)
+            frame.writeInt(i, -1);
+    }
+
+    private int getHeaderFrameIndex(int entry) {
+        int frameIndex = entry * 2 / frameCapacity;
+        return frameIndex;
+    }
+
+    private int getHeaderFrameOffset(int entry) {
+        int offset = entry * 2 % frameCapacity;
+        return offset;
+    }
+
+}
+
+class IntSerDeBuffer {
+
+    private byte[] bytes;
+
+    public IntSerDeBuffer(byte[] data) {
+        this.bytes = data;
+    }
+
+    public int getInt(int pos) {
+        int offset = pos * 4;
+        return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+                + ((bytes[offset + 3] & 0xff) << 0);
+    }
+
+    public void writeInt(int pos, int value) {
+        int offset = pos * 4;
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
+
+    public int capacity() {
+        return bytes.length / 4;
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
new file mode 100644
index 0000000..6618fb1
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/TuplePointer.java
@@ -0,0 +1,6 @@
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public class TuplePointer {
+    public int frameIndex;
+    public int tupleIndex;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/StringSerializationUtils.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/StringSerializationUtils.java
new file mode 100644
index 0000000..1126c1f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/StringSerializationUtils.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.util;
+
+import java.util.Arrays;
+
+public class StringSerializationUtils {
+    public static String toString(Object object) {
+        if (object instanceof Object[]) {
+            return Arrays.deepToString((Object[]) object);
+        } else {
+            return String.valueOf(object);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
index 694f183..b107282 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
@@ -26,23 +26,24 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MinMaxAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.SumAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.AvgAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.ConcatAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -50,74 +51,107 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalHashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
 
 /**
- * Test cases for external hash group operator.
+ * @author jarodwen
  */
 public class ExternalAggregateTest extends AbstractIntegrationTest {
 
-    /**
-     * Test 01: aggregate (count) on single field, on a simple data set.
-     * 
-     * @throws Exception
-     */
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
+            new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+
+    static final String outSplitsPrefix = System.getProperty("java.io.tmpdir");
+
+    static final String outSplits1 = "nc1:" + outSplitsPrefix + "/aggregation_";
+    static final String outSplits2 = "nc2:" + outSplitsPrefix + "/aggregation_";
+
+    static final boolean isOutputFile = true;
+
+    final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+            UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, }, '|');
+
+    private static FileSplit[] parseFileSplits(String fileSplits) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+        }
+        return fSplits;
+    }
+
+    private static AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, boolean isFile,
+            String prefix) {
+        AbstractSingleActivityOperatorDescriptor printer;
+
+        if (!isOutputFile)
+            printer = new PrinterOperatorDescriptor(spec);
+        else
+            printer = new PlainFileWriterOperatorDescriptor(spec, new ConstantFileSplitProvider(
+                    parseFileSplits(outSplits1 + prefix + ".nc1, " + outSplits2 + prefix + ".nc2")), "\t");
+
+        return printer;
+    }
+
     @Test
-    public void externalAggregateTestSingleFieldSimpleData() throws Exception {
+    public void hashSingleKeyScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
-                new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
-                new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        // Input format: a string field as the key
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        // Output format: a string field as the key, and an integer field as the
-        // count
         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-        // Data set format: word(string),count(int)
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec,
-                splitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
-                desc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
-
-        int[] keys = new int[] { 0 };
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(
-                spec, // Job conf
-                keys, // Group key
-                3, // Number of frames
-                false, // Whether to sort the output
-                // Hash partitioner
-                new FieldHashPartitionComputerFactory(keys,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                // Key comparator
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                // Aggregator factory
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-                outputRec, // Output format
-                tableSize // Size of the hashing table, which is used to control
-        // the partition when hashing
-        );
+                new UTF8StringNormalizedKeyComputerFactory(), new CountAggregatorDescriptorFactory(),
+                new IntSumAggregatorDescriptorFactory(keyFields.length), outputRec,
+                new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
+                true);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keys,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashSingleKeyScalarGroupTest");
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -127,186 +161,41 @@
         runTest(spec);
     }
 
-    /**
-     * Test 02: Control experiment using in-memory aggregator, on the same data
-     * set of {@link #externalAggregateTest01()}
-     * 
-     * @throws Exception
-     */
     @Test
-    public void externalAggregateTestSingleFieldSimpleDataInMemControl() throws Exception {
+    public void hashMultipleKeyScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
-                new FileSplit(NC2_ID, new FileReference(new File("data/wordcount.tsv"))),
-                new FileSplit(NC1_ID, new FileReference(new File("data/wordcount.tsv"))) });
-
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
-
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec,
-                splitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
 
-        int[] keys = new int[] { 0 };
-        int tableSize = 8;
-
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keys,
-                new FieldHashPartitionComputerFactory(keys,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-                outputRec, tableSize);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keys,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
-
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    /**
-     * Test 03: aggregates on multiple fields
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void externalAggregateTestMultiAggFields() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/lineitem.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
-
-        int[] keys = new int[] { 0 };
-        int tableSize = 8;
-
-        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
-                new FieldHashPartitionComputerFactory(keys,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
-                        new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec, tableSize);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keys,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, ordScanner, 0, grouper, 0);
-
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    /**
-     * Test 05: aggregate on multiple key fields
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void externalAggregateTestMultiKeys() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/lineitem.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE });
+                IntegerSerializerDeserializer.INSTANCE, });
 
-        // Group on two fields
-        int[] keys = new int[] { 0, 1 };
+        int[] keyFields = new int[] { 0, 9 };
+        int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3, false,
-                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
-                                new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
-                tableSize);
+                        UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
+                new IntSumAggregatorDescriptorFactory(1), new IntSumAggregatorDescriptorFactory(keyFields.length),
+                outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, ordScanner, 0, grouper, 0);
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyScalarGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -315,66 +204,44 @@
         runTest(spec);
     }
 
-    /**
-     * Test 06: tests on non-string key field
-     * 
-     * @throws Exception
-     */
     @Test
-    public void externalAggregateTestNonStringKey() throws Exception {
+    public void hashMultipleKeyMultipleScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/lineitem.tbl"))) };
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, }, '|'), ordersDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE, });
 
-        // Group on two fields
-        int[] keys = new int[] { 0, 1 };
+        int[] keyFields = new int[] { 0, 9 };
+        int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalHashGroupOperatorDescriptor grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, 3000, true,
-                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory(),
-                                new SumAggregatorFactory(4), new MinMaxAggregatorFactory(true, 5) }), outputRec,
-                tableSize);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
+                        new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3) }),
+                new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
+                        new IntSumAggregatorDescriptorFactory(2, 2), new IntSumAggregatorDescriptorFactory(3, 3) }),
+                outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        IntegerBinaryHashFunctionFactory.INSTANCE, IntegerBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, ordScanner, 0, grouper, 0);
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyMultipleScalarGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -382,4 +249,138 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-}
\ No newline at end of file
+
+    @Test
+    public void hashMultipleKeyNonScalarGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 3;
+        int tableSize = 8;
+
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(), new ConcatAggregatorDescriptorFactory(9),
+                new ConcatAggregatorDescriptorFactory(keyFields.length), outputRec,
+                new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
+                true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyNonScalarGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0, 9 };
+        int frameLimits = 3;
+        int tableSize = 8;
+
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
+                        new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3),
+                        new ConcatAggregatorDescriptorFactory(9, 4) }), new MultiAggregatorDescriptorFactory(
+                        new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(2, 2),
+                                new IntSumAggregatorDescriptorFactory(3, 3),
+                                new ConcatAggregatorDescriptorFactory(4, 4) }), outputRec,
+                new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashMultipleKeyMultipleFieldsGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void hashSingleKeyScalarAvgGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 3;
+        int tableSize = 8;
+
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(), new AvgAggregatorDescriptorFactory(1),
+                new AvgAggregatorDescriptorFactory(keyFields.length), outputRec, new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
+                "hashSingleKeyScalarGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+}
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 8d22bca..b02b0ee 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -36,13 +36,18 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IntSumAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
@@ -55,8 +60,9 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableGroupingTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 
@@ -112,7 +118,7 @@
         for (int i = 0; i < 3; i++) {
             long start = System.currentTimeMillis();
             job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i % 2),
-                    options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 2, options.outPlain);
+                    options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 3, options.outPlain);
 
             System.out.print(i + "\t" + (System.currentTimeMillis() - start));
             start = System.currentTimeMillis();
@@ -195,14 +201,22 @@
 
         switch (alg) {
             case 0: // External hash group
-                grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+                grouper = new ExternalGroupOperatorDescriptor(
+                        spec,
+                        keys,
+                        framesLimit,
+                        new IBinaryComparatorFactory[] {
                         // IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                                new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
-                        htSize);
+                        IntegerBinaryComparatorFactory.INSTANCE },
+                        new IntegerNormalizedKeyComputerFactory(),
+                        new MultiAggregatorDescriptorFactory(
+                                new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+                        new MultiAggregatorDescriptorFactory(
+                                new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
+                        outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
+                                new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
 
                 createPartitionConstraint(spec, grouper, outSplits);
 
@@ -258,23 +272,31 @@
                 spec.connect(scanConn, fileScanner, 0, grouper, 0);
                 break;
             default:
-                grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+                grouper = new ExternalGroupOperatorDescriptor(
+                        spec,
+                        keys,
+                        framesLimit,
+                        new IBinaryComparatorFactory[] {
                         // IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                                new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
-                        htSize);
+                        IntegerBinaryComparatorFactory.INSTANCE },
+                        new IntegerNormalizedKeyComputerFactory(),
+                        new MultiAggregatorDescriptorFactory(
+                                new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+                        new MultiAggregatorDescriptorFactory(
+                                new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
+                        outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
+                                new IBinaryHashFunctionFactory[] {
+                                // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
 
                 createPartitionConstraint(spec, grouper, outSplits);
 
                 // Connect scanner with the grouper
-                IConnectorDescriptor scanGroupConnDef = new MToNHashPartitioningConnectorDescriptor(spec,
+                IConnectorDescriptor defaultGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
                         new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
                         // IntegerBinaryHashFunctionFactory.INSTANCE,
                                 IntegerBinaryHashFunctionFactory.INSTANCE }));
-                spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+                spec.connect(defaultGroupConn, fileScanner, 0, grouper, 0);
         }
 
         IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);