Updated the aggregator interfaces to use ArrayTupleBuilder; removed the state length calculator.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@977 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 7647e50..56ba785 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -59,6 +59,25 @@
         }
         return false;
     }
+    
+    public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length){
+        if (tupleDataEndOffset + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            int effectiveSlots = 0;
+            for (int i = 0; i < fieldSlots.length; ++i) {
+                if(fieldSlots[i] > 0){
+                    buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+                    effectiveSlots++;
+                }
+            }
+            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + effectiveSlots * 4, length);
+            tupleDataEndOffset += effectiveSlots * 4 + length;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
 
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
         int length = tEndOffset - tStartOffset;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index a0a2dd2..2edbf97 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -39,7 +39,9 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
@@ -293,6 +295,9 @@
                 storedKeys[i] = i;
             }
 
+            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+                    recordDescriptors[0].getFields().length);
+
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 /**
                  * Input frames, one for each run file.
@@ -303,12 +308,16 @@
                  * Output frame.
                  */
                 private ByteBuffer outFrame, writerFrame;
-                private int outFrameOffset, writerFrameOffset;
+                private final FrameTupleAppender outAppender = new FrameTupleAppender(
+                        ctx.getFrameSize());
+                private FrameTupleAppender writerAppender;
 
                 private LinkedList<RunFileReader> runs;
 
                 private AggregateActivityState aggState;
 
+                private ArrayTupleBuilder finalTupleBuilder;
+
                 /**
                  * how many frames to be read ahead once
                  */
@@ -339,7 +348,7 @@
                             runs = new LinkedList<RunFileReader>(runs);
                             inFrames = new ArrayList<ByteBuffer>();
                             outFrame = ctx.allocateFrame();
-                            outFrameOffset = 0;
+                            outAppender.reset(outFrame, true);
                             outFrameAccessor.reset(outFrame);
                             while (runs.size() > 0) {
                                 try {
@@ -456,31 +465,25 @@
                                  * tuple builder
                                  */
 
-                                int stateLength = aggregator
-                                        .getBinaryAggregateStateLength(fta,
-                                                tupleIndex, aggregateState);
+                                tupleBuilder.reset();
 
-                                if (FrameToolsForGroupers.isFrameOverflowing(
-                                        outFrame, stateLength, (outFrameOffset == 0))) {
+                                aggregator.init(tupleBuilder, fta, tupleIndex,
+                                        aggregateState);
+
+                                if (!outAppender.appendSkipEmptyField(
+                                        tupleBuilder.getFieldEndOffsets(),
+                                        tupleBuilder.getByteArray(), 0,
+                                        tupleBuilder.getSize())) {
                                     flushOutFrame(writer, finalPass);
-                                    if (FrameToolsForGroupers
-                                            .isFrameOverflowing(outFrame,
-                                                    stateLength, (outFrameOffset == 0))) {
+                                    if (!outAppender.appendSkipEmptyField(
+                                            tupleBuilder.getFieldEndOffsets(),
+                                            tupleBuilder.getByteArray(), 0,
+                                            tupleBuilder.getSize())) {
                                         throw new HyracksDataException(
                                                 "The partial result is too large to be initialized in a frame.");
                                     }
                                 }
 
-                                aggregator.init(outFrame.array(),
-                                        outFrameOffset, fta, tupleIndex,
-                                        aggregateState);
-
-                                FrameToolsForGroupers
-                                        .updateFrameMetaForNewTuple(outFrame,
-                                                stateLength, (outFrameOffset == 0));
-
-                                outFrameOffset += stateLength;
-
                             } else {
                                 /**
                                  * if new tuple is in the same group of the
@@ -498,8 +501,9 @@
                                     runFileReaders, tupleAccessors, topTuples);
                         }
 
-                        if (outFrameOffset > 0) {
+                        if (outAppender.getTupleCount() > 0) {
                             flushOutFrame(writer, finalPass);
+                            outAppender.reset(outFrame, true);
                         }
 
                         aggregator.close();
@@ -521,68 +525,56 @@
 
                 private void flushOutFrame(IFrameWriter writer, boolean isFinal)
                         throws HyracksDataException {
+
+                    if (finalTupleBuilder == null) {
+                        finalTupleBuilder = new ArrayTupleBuilder(
+                                recordDescriptors[0].getFields().length);
+                    }
+
                     if (writerFrame == null) {
                         writerFrame = ctx.allocateFrame();
-                        writerFrameOffset = 0;
+                    }
+
+                    if (writerAppender == null) {
+                        writerAppender = new FrameTupleAppender(
+                                ctx.getFrameSize());
+                        writerAppender.reset(writerFrame, true);
                     }
 
                     outFrameAccessor.reset(outFrame);
 
                     for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
 
-                        int outputLen;
+                        finalTupleBuilder.reset();
 
                         if (isFinal) {
 
-                            outputLen = aggregator.getFinalOutputLength(
-                                    outFrameAccessor, i, aggregateState);
-
-                            if (FrameToolsForGroupers.isFrameOverflowing(
-                                    writerFrame, outputLen, (writerFrameOffset == 0))) {
-                                FrameUtils.flushFrame(writerFrame, writer);
-                                writerFrameOffset = 0;
-                                if (FrameToolsForGroupers.isFrameOverflowing(
-                                        writerFrame, outputLen, (writerFrameOffset == 0))) {
-                                    throw new HyracksDataException(
-                                            "Final aggregation output is too large to be fit into a frame.");
-                                }
-                            }
-
-                            aggregator.outputFinalResult(writerFrame.array(),
-                                    writerFrameOffset, outFrameAccessor, i,
+                            aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i,
                                     aggregateState);
 
+
                         } else {
 
-                            outputLen = aggregator.getPartialOutputLength(
-                                    outFrameAccessor, i, aggregateState);
-
-                            if (FrameToolsForGroupers.isFrameOverflowing(
-                                    writerFrame, outputLen, (writerFrameOffset == 0))) {
-                                FrameUtils.flushFrame(writerFrame, writer);
-                                writerFrameOffset = 0;
-                                if (FrameToolsForGroupers.isFrameOverflowing(
-                                        writerFrame, outputLen, (writerFrameOffset == 0))) {
-                                    throw new HyracksDataException(
-                                            "Final aggregation output is too large to be fit into a frame.");
-                                }
-                            }
-
-                            aggregator.outputPartialResult(writerFrame.array(),
-                                    writerFrameOffset, outFrameAccessor, i,
+                            aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i,
                                     aggregateState);
                         }
-
-                        FrameToolsForGroupers.updateFrameMetaForNewTuple(
-                                writerFrame, outputLen, (writerFrameOffset == 0));
-
-                        writerFrameOffset += outputLen;
+                        
+                        
+                        if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                            FrameUtils.flushFrame(writerFrame, writer);
+                            writerAppender.reset(writerFrame, true);
+                            if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                                throw new HyracksDataException(
+                                        "Aggregation output is too large to be fit into a frame.");
+                            }
+                        }
                     }
-                    if (writerFrameOffset > 0) {
+                    if (writerAppender.getTupleCount() > 0) {
                         FrameUtils.flushFrame(writerFrame, writer);
-                        writerFrameOffset = 0;
+                        writerAppender.reset(writerFrame, true);
                     }
-                    outFrameOffset = 0;
+                    
+                    outAppender.reset(outFrame, true);
                 }
 
                 private void setNextTopTuple(int runIndex, int[] tupleIndices,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index 853ae36..f4cfe98 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -28,7 +28,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 
 class GroupingHashTable {
@@ -78,11 +80,12 @@
     private final ITuplePartitionComputer tpc;
     private final IAggregatorDescriptor aggregator;
 
-    private int bufferOffset;
-    private int tupleCountInBuffer;
+    private final FrameTupleAppender appender;
 
     private final FrameTupleAccessor storedKeysAccessor;
 
+    private final ArrayTupleBuilder stateTupleBuilder, outputTupleBuilder;
+
     GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
             IBinaryComparatorFactory[] comparatorFactories,
             ITuplePartitionComputerFactory tpcf,
@@ -127,7 +130,20 @@
         storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
                 storedKeysRecordDescriptor);
         lastBIndex = -1;
+        
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        
         addNewBuffer();
+
+        if (fields.length < outRecordDescriptor.getFields().length) {
+            stateTupleBuilder = new ArrayTupleBuilder(
+                    outRecordDescriptor.getFields().length);
+        } else {
+            stateTupleBuilder = new ArrayTupleBuilder(
+                    outRecordDescriptor.getFields().length + 1);
+        }
+        outputTupleBuilder = new ArrayTupleBuilder(
+                outRecordDescriptor.getFields().length);
     }
 
     private void addNewBuffer() {
@@ -135,8 +151,7 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
         buffers.add(buffer);
-        bufferOffset = 0;
-        tupleCountInBuffer = 0;
+        appender.reset(buffer, true);
         ++lastBIndex;
     }
 
@@ -165,29 +180,22 @@
             // Add aggregation fields
             AggregateState newState = aggregator.createAggregateStates();
 
-            int initLength = aggregator.getBinaryAggregateStateLength(accessor,
-                    tIndex, newState);
+            stateTupleBuilder.reset();
 
-            if (FrameToolsForGroupers.isFrameOverflowing(
-                    buffers.get(lastBIndex), initLength, (bufferOffset == 0))) {
+            aggregator.init(stateTupleBuilder, accessor, tIndex, newState);
+
+            if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+                    stateTupleBuilder.getByteArray(), 0,
+                    stateTupleBuilder.getSize())) {
                 addNewBuffer();
-                if (bufferOffset + initLength > ctx.getFrameSize()) {
+                if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+                        stateTupleBuilder.getByteArray(), 0,
+                        stateTupleBuilder.getSize())) {
                     throw new HyracksDataException(
-                            "Cannot initialize the aggregation state within a frame.");
+                            "Cannot init the aggregate state in a single frame.");
                 }
             }
-
-            aggregator.init(buffers.get(lastBIndex).array(), bufferOffset,
-                    accessor, tIndex, newState);
-
-            FrameToolsForGroupers.updateFrameMetaForNewTuple(
-                    buffers.get(lastBIndex), initLength, (bufferOffset == 0));
-
-            bufferOffset += initLength;
-
-            // Update tuple count in frame
-            tupleCountInBuffer++;
-
+            
             if (accumulatorSize >= aggregateStates.length) {
                 aggregateStates = Arrays.copyOf(aggregateStates,
                         aggregateStates.length * 2);
@@ -195,7 +203,7 @@
 
             aggregateStates[saIndex] = newState;
 
-            link.add(lastBIndex, tupleCountInBuffer - 1, saIndex);
+            link.add(lastBIndex, appender.getTupleCount() - 1, saIndex);
 
         } else {
             aggregator.aggregate(accessor, tIndex, null, 0,
@@ -205,7 +213,7 @@
 
     void write(IFrameWriter writer) throws HyracksDataException {
         ByteBuffer buffer = ctx.allocateFrame();
-        int bufOffset = 0;
+        appender.reset(buffer, true);
 
         for (int i = 0; i < table.length; ++i) {
             Link link = table[i];
@@ -216,38 +224,27 @@
                     int aIndex = link.pointers[j + 2];
                     ByteBuffer keyBuffer = buffers.get(bIndex);
                     storedKeysAccessor.reset(keyBuffer);
-
-                    int outputLen = aggregator
-                            .getFinalOutputLength(storedKeysAccessor, tIndex,
-                                    aggregateStates[aIndex]);
-
-                    if (FrameToolsForGroupers.isFrameOverflowing(buffer,
-                            outputLen, (bufOffset == 0))) {
-                        writer.nextFrame(buffer);
-                        bufOffset = 0;
-                        if (FrameToolsForGroupers.isFrameOverflowing(buffer,
-                                outputLen, (bufOffset == 0))) {
-                            throw new HyracksDataException(
-                                    "Cannot write aggregation output in a frame.");
-                        }
-                    }
+                    
+                    outputTupleBuilder.reset();
 
                     aggregator
-                            .outputFinalResult(buffer.array(), bufOffset,
+                            .outputFinalResult(outputTupleBuilder, 
                                     storedKeysAccessor, tIndex,
                                     aggregateStates[aIndex]);
-
-                    FrameToolsForGroupers.updateFrameMetaForNewTuple(buffer,
-                            outputLen, (bufOffset == 0));
-
-                    bufOffset += outputLen;
+                    
+                    if(!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())){
+                        writer.nextFrame(buffer);
+                        appender.reset(buffer, true);
+                        if(!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                            throw new HyracksDataException("Cannot write the aggregation output into a frame.");
+                        }
+                    }
 
                 }
             }
         }
-        if (bufOffset != 0) {
+        if (appender.getTupleCount() != 0) {
             writer.nextFrame(buffer);
-            bufOffset = 0;
         }
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 5cff298..6bd053f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -30,7 +30,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -113,6 +115,7 @@
 
         final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(
                 keyFields, storedKeys, comparators);
+
         final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
                 storedKeys, storedKeys, comparators);
 
@@ -133,13 +136,27 @@
         final AggregateState aggregateState = aggregator
                 .createAggregateStates();
 
+        final ArrayTupleBuilder stateTupleBuilder;
+        if (keyFields.length < outRecordDescriptor.getFields().length) {
+            stateTupleBuilder = new ArrayTupleBuilder(
+                    outRecordDescriptor.getFields().length);
+        } else {
+            stateTupleBuilder = new ArrayTupleBuilder(
+                    outRecordDescriptor.getFields().length + 1);
+        }
+
+        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
+                outRecordDescriptor.getFields().length);
+
         return new ISpillableTable() {
 
             private int lastBufIndex;
-            private int outFrameOffset;
-            private int tupleCountInOutFrame;
 
             private ByteBuffer outputFrame;
+            private FrameTupleAppender outputAppender;
+
+            private FrameTupleAppender stateAppender = new FrameTupleAppender(
+                    ctx.getFrameSize());
 
             private final ISerializableTable table = new SerializableHashTable(
                     tableSize, ctx);
@@ -229,27 +246,29 @@
 
                 if (!foundGroup) {
 
-                    int initLen = aggregator.getBinaryAggregateStateLength(
-                            accessor, tIndex, aggregateState);
+                    stateTupleBuilder.reset();
 
-                    if (FrameToolsForGroupers.isFrameOverflowing(
-                            frames.get(lastBufIndex), initLen, (outFrameOffset == 0))) {
+                    aggregator.init(stateTupleBuilder, accessor, tIndex,
+                            aggregateState);
+                    if (!stateAppender.appendSkipEmptyField(
+                            stateTupleBuilder.getFieldEndOffsets(),
+                            stateTupleBuilder.getByteArray(), 0,
+                            stateTupleBuilder.getSize())) {
                         if (!nextAvailableFrame()) {
                             return false;
                         }
+                        if (!stateAppender.appendSkipEmptyField(
+                                stateTupleBuilder.getFieldEndOffsets(),
+                                stateTupleBuilder.getByteArray(), 0,
+                                stateTupleBuilder.getSize())) {
+                            throw new HyracksDataException(
+                                    "Cannot init external aggregate state in a frame.");
+                        }
                     }
 
-                    aggregator.init(frames.get(lastBufIndex).array(),
-                            outFrameOffset, accessor, tIndex, aggregateState);
-
-                    FrameToolsForGroupers.updateFrameMetaForNewTuple(
-                            frames.get(lastBufIndex), initLen, (outFrameOffset == 0));
-
-                    outFrameOffset += initLen;
-                    tupleCountInOutFrame++;
-
                     storedTuplePointer.frameIndex = lastBufIndex;
-                    storedTuplePointer.tupleIndex = tupleCountInOutFrame - 1;
+                    storedTuplePointer.tupleIndex = stateAppender
+                            .getTupleCount() - 1;
                     table.insert(entry, storedTuplePointer);
                 } else {
 
@@ -277,7 +296,13 @@
                     outputFrame = ctx.allocateFrame();
                 }
 
-                int outputFrameOffset = 0;
+                if (outputAppender == null) {
+                    outputAppender = new FrameTupleAppender(
+                            outputFrame.capacity());
+                }
+
+                outputAppender.reset(outputFrame, true);
+
                 writer.open();
 
                 if (tPointers == null) {
@@ -295,64 +320,46 @@
 
                             storedKeysAccessor1.reset(frames.get(bIndex));
 
-                            int outputLen;
+                            outputTupleBuilder.reset();
 
                             if (isPartial) {
 
-                                outputLen = aggregator.getPartialOutputLength(
-                                        storedKeysAccessor1, tIndex,
-                                        aggregateState);
-
-                                if (FrameToolsForGroupers.isFrameOverflowing(
-                                        outputFrame, outputLen, (outputFrameOffset == 0))) {
-                                    FrameUtils.flushFrame(outputFrame, writer);
-                                    outputFrameOffset = 0;
-                                    if (FrameToolsForGroupers
-                                            .isFrameOverflowing(outputFrame,
-                                                    outputLen, (outputFrameOffset == 0))) {
-                                        throw new HyracksDataException(
-                                                "The output item is too large to be fit into a frame.");
-                                    }
-                                }
-
                                 aggregator.outputPartialResult(
-                                        outputFrame.array(), outputFrameOffset,
+                                        outputTupleBuilder,
                                         storedKeysAccessor1, tIndex,
                                         aggregateState);
 
                             } else {
-                                outputLen = aggregator.getFinalOutputLength(
-                                        storedKeysAccessor1, tIndex,
-                                        aggregateState);
-
-                                if (FrameToolsForGroupers.isFrameOverflowing(
-                                        outputFrame, outputLen, (outputFrameOffset == 0))) {
-                                    FrameUtils.flushFrame(outputFrame, writer);
-                                    outputFrameOffset = 0;
-                                    if (FrameToolsForGroupers
-                                            .isFrameOverflowing(outputFrame,
-                                                    outputLen, (outputFrameOffset == 0))) {
-                                        throw new HyracksDataException(
-                                                "The output item is too large to be fit into a frame.");
-                                    }
-                                }
 
                                 aggregator.outputFinalResult(
-                                        outputFrame.array(), outputFrameOffset,
+                                        outputTupleBuilder,
                                         storedKeysAccessor1, tIndex,
                                         aggregateState);
                             }
 
-                            FrameToolsForGroupers.updateFrameMetaForNewTuple(
-                                    outputFrame, outputLen, (outputFrameOffset == 0));
-
-                            outputFrameOffset += outputLen;
+                            if (!outputAppender.appendSkipEmptyField(
+                                    outputTupleBuilder.getFieldEndOffsets(),
+                                    outputTupleBuilder.getByteArray(), 0,
+                                    outputTupleBuilder.getSize())) {
+                                FrameUtils.flushFrame(outputFrame, writer);
+                                outputAppender.reset(outputFrame, true);
+                                if (!outputAppender
+                                        .appendSkipEmptyField(
+                                                outputTupleBuilder
+                                                        .getFieldEndOffsets(),
+                                                outputTupleBuilder
+                                                        .getByteArray(), 0,
+                                                outputTupleBuilder.getSize())) {
+                                    throw new HyracksDataException(
+                                            "The output item is too large to be fit into a frame.");
+                                }
+                            }
 
                         } while (true);
                     }
-                    if (outputFrameOffset > 0) {
+                    if (outputAppender.getTupleCount() > 0) {
                         FrameUtils.flushFrame(outputFrame, writer);
-                        outputFrameOffset = 0;
+                        outputAppender.reset(outputFrame, true);
                     }
                     aggregator.close();
                     return;
@@ -369,58 +376,41 @@
                     ByteBuffer buffer = frames.get(frameIndex);
                     storedKeysAccessor1.reset(buffer);
 
-                    int outputLen;
+                    outputTupleBuilder.reset();
 
                     if (isPartial) {
 
-                        outputLen = aggregator
-                                .getPartialOutputLength(storedKeysAccessor1,
-                                        tupleIndex, aggregateState);
-
-                        if (FrameToolsForGroupers.isFrameOverflowing(
-                                outputFrame, outputLen, (outputFrameOffset == 0))) {
-                            FrameUtils.flushFrame(outputFrame, writer);
-                            outputFrameOffset = 0;
-                            if (FrameToolsForGroupers.isFrameOverflowing(
-                                    outputFrame, outputLen, (outputFrameOffset == 0))) {
-                                throw new HyracksDataException(
-                                        "The output item is too large to be fit into a frame.");
-                            }
-                        }
-
-                        aggregator.outputPartialResult(outputFrame.array(),
-                                outputFrameOffset, storedKeysAccessor1,
-                                tupleIndex, aggregateState);
+                        aggregator
+                                .outputPartialResult(outputTupleBuilder,
+                                        storedKeysAccessor1, tupleIndex,
+                                        aggregateState);
 
                     } else {
-                        outputLen = aggregator
-                                .getFinalOutputLength(storedKeysAccessor1,
-                                        tupleIndex, aggregateState);
 
-                        if (FrameToolsForGroupers.isFrameOverflowing(
-                                outputFrame, outputLen, (outputFrameOffset == 0))) {
-                            FrameUtils.flushFrame(outputFrame, writer);
-                            outputFrameOffset = 0;
-                            if (FrameToolsForGroupers.isFrameOverflowing(
-                                    outputFrame, outputLen, (outputFrameOffset == 0))) {
-                                throw new HyracksDataException(
-                                        "The output item is too large to be fit into a frame.");
-                            }
-                        }
-
-                        aggregator.outputFinalResult(outputFrame.array(),
-                                outputFrameOffset, storedKeysAccessor1,
-                                tupleIndex, aggregateState);
+                        aggregator
+                                .outputFinalResult(outputTupleBuilder,
+                                        storedKeysAccessor1, tupleIndex,
+                                        aggregateState);
                     }
 
-                    FrameToolsForGroupers.updateFrameMetaForNewTuple(
-                            outputFrame, outputLen, (outputFrameOffset == 0));
-
-                    outputFrameOffset += outputLen;
+                    if (!outputAppender.appendSkipEmptyField(
+                            outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0,
+                            outputTupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputFrame, writer);
+                        outputAppender.reset(outputFrame, true);
+                        if (!outputAppender.appendSkipEmptyField(
+                                outputTupleBuilder.getFieldEndOffsets(),
+                                outputTupleBuilder.getByteArray(), 0,
+                                outputTupleBuilder.getSize())) {
+                            throw new HyracksDataException(
+                                    "The output item is too large to be fit into a frame.");
+                        }
+                    }
                 }
-                if (outputFrameOffset > 0) {
+                if (outputAppender.getTupleCount() > 0) {
                     FrameUtils.flushFrame(outputFrame, writer);
-                    outputFrameOffset = 0;
+                    outputAppender.reset(outputFrame, true);
                 }
                 aggregator.close();
             }
@@ -453,8 +443,7 @@
                     frame.position(0);
                     frame.limit(frame.capacity());
                     frames.add(frame);
-                    outFrameOffset = 0;
-                    tupleCountInOutFrame = 0;
+                    stateAppender.reset(frame, true);
                     lastBufIndex = frames.size() - 1;
                 } else {
                     // Reuse an old frame
@@ -462,8 +451,7 @@
                     ByteBuffer frame = frames.get(lastBufIndex);
                     frame.position(0);
                     frame.limit(frame.capacity());
-                    outFrameOffset = 0;
-                    tupleCountInOutFrame = 0;
+                    stateAppender.reset(frame, true);
                 }
                 return true;
             }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 358bf42..329eb4b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -16,6 +16,7 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 /**
  *
@@ -30,20 +31,6 @@
     public AggregateState createAggregateStates();
 
     /**
-     * Get the length of the binary states.
-     * 
-     * @return
-     */
-    public int getBinaryAggregateStateLength(IFrameTupleAccessor accessor,
-            int tIndex, AggregateState state) throws HyracksDataException;
-
-    public int getPartialOutputLength(IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException;
-
-    public int getFinalOutputLength(IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException;
-
-    /**
      * Initialize the state based on the input tuple.
      * 
      * @param accessor
@@ -55,7 +42,7 @@
      *            The state to be initialized.
      * @throws HyracksDataException
      */
-    public void init(byte[] buf, int offset, IFrameTupleAccessor accessor,
+    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
             int tIndex, AggregateState state) throws HyracksDataException;
 
     /**
@@ -98,7 +85,7 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public void outputPartialResult(byte[] data, int offset,
+    public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
             IFrameTupleAccessor accessor, int tIndex, AggregateState state)
             throws HyracksDataException;
 
@@ -114,7 +101,7 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public void outputFinalResult(byte[] data, int offset,
+    public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
             IFrameTupleAccessor accessor, int tIndex, AggregateState state)
             throws HyracksDataException;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index 1471318..32f4365 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -112,15 +112,6 @@
 
     public boolean needsObjectState();
 
-    public int getBinaryStateLength(IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException;
-
-    public int getPartialResultLength(byte[] data, int offset,
-            AggregateState state) throws HyracksDataException;
-
-    public int getFinalResultLength(byte[] data, int offset,
-            AggregateState state) throws HyracksDataException;
-
     public AggregateState createState();
 
     /**
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index 8ff4942..17ad5f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -76,7 +76,7 @@
             @Override
             public void open() throws HyracksDataException {
                 pgw = new PreclusteredGroupWriter(ctx, groupFields,
-                        comparators, aggregator, inRecordDesc, writer);
+                        comparators, aggregator, inRecordDesc, recordDescriptors[0], writer);
                 pgw.open();
             }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index 17f62f0..226642f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -21,7 +21,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class PreclusteredGroupWriter implements IFrameWriter {
@@ -35,13 +37,14 @@
     private final FrameTupleAccessor copyFrameAccessor;
 
     private final ByteBuffer outFrame;
-    private int outFrameOffset;
+    private final FrameTupleAppender appender;
+    private final ArrayTupleBuilder tupleBuilder;
 
     private boolean first;
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
             IBinaryComparator[] comparators, IAggregatorDescriptor aggregator,
-            RecordDescriptor inRecordDesc, IFrameWriter writer) {
+            RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc, IFrameWriter writer) {
         this.groupFields = groupFields;
         this.comparators = comparators;
         this.aggregator = aggregator;
@@ -55,7 +58,10 @@
         copyFrameAccessor.reset(copyFrame);
 
         outFrame = ctx.allocateFrame();
-        outFrameOffset = 0;
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(outFrame, true);
+        
+        tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
     }
 
     @Override
@@ -71,7 +77,9 @@
         for (int i = 0; i < nTuples; ++i) {
             if (first) {
 
-                aggregator.init(null, 0, inFrameAccessor, i, aggregateState);
+                tupleBuilder.reset();
+                
+                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
 
                 first = false;
 
@@ -97,7 +105,8 @@
                 currTupleIndex)) {
             writeOutput(prevTupleAccessor, prevTupleIndex);
 
-            aggregator.init(null, 0, currTupleAccessor, currTupleIndex,
+            tupleBuilder.reset();
+            aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex,
                     aggregateState);
         } else {
             aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0,
@@ -108,26 +117,20 @@
     private void writeOutput(final FrameTupleAccessor lastTupleAccessor,
             int lastTupleIndex) throws HyracksDataException {
 
-        int outLen = aggregator.getFinalOutputLength(lastTupleAccessor,
-                lastTupleIndex, aggregateState);
+        tupleBuilder.reset();
 
-        if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+        aggregator.outputFinalResult(tupleBuilder,
+                lastTupleAccessor, lastTupleIndex, aggregateState);
+        
+        if(!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
             FrameUtils.flushFrame(outFrame, writer);
-            outFrameOffset = 0;
-            if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+            appender.reset(outFrame, true);
+            if(!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()))  {
                 throw new HyracksDataException(
                         "The output cannot be fit into a frame.");
             }
         }
 
-        aggregator.outputFinalResult(outFrame.array(), outFrameOffset,
-                lastTupleAccessor, lastTupleIndex, aggregateState);
-
-        FrameToolsForGroupers.updateFrameMetaForNewTuple(outFrame, outLen,
-                (outFrameOffset == 0));
-
-        outFrameOffset += outLen;
-
     }
 
     private boolean sameGroup(FrameTupleAccessor a1, int t1Idx,
@@ -158,7 +161,7 @@
         if (!first) {
             writeOutput(copyFrameAccessor,
                     copyFrameAccessor.getTupleCount() - 1);
-            if (outFrameOffset > 0) {
+            if (appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(outFrame, writer);
             }
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index c140a49..66eed9d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -171,29 +171,6 @@
                 return new AggregateState(new Integer[]{0, 0});
             }
 
-            @Override
-            public int getBinaryStateLength(IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state)
-                    throws HyracksDataException {
-                if(useObjectState){
-                    return 0;
-                } else {
-                    return 8;
-                }
-            }
-
-            @Override
-            public int getPartialResultLength(byte[] data, int offset,
-                    AggregateState state) throws HyracksDataException {
-                return 8;
-            }
-
-            @Override
-            public int getFinalResultLength(byte[] data, int offset,
-                    AggregateState state) throws HyracksDataException {
-                return 4;
-            }
-
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index c6d4e99..cc30641 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -140,29 +140,6 @@
             public boolean needsBinaryState() {
                 return !useObjectState;
             }
-
-            @Override
-            public int getBinaryStateLength(IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state)
-                    throws HyracksDataException {
-                if(useObjectState){
-                    return 0;
-                } else {
-                    return 8;
-                }
-            }
-
-            @Override
-            public int getPartialResultLength(byte[] data, int offset,
-                    AggregateState state) throws HyracksDataException {
-                return 8;
-            }
-
-            @Override
-            public int getFinalResultLength(byte[] data, int offset,
-                    AggregateState state) throws HyracksDataException {
-                return 4;
-            }
             
             @Override
             public AggregateState createState() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index e36a168..e320603 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -144,26 +144,6 @@
                     state.state = count;
                 }
             }
-
-            @Override
-            public int getBinaryStateLength(IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) {
-                if(useObjectState)
-                    return 0;
-                return 4;
-            }
-
-            @Override
-            public int getPartialResultLength(byte[] data, int offset,
-                    AggregateState state) {
-                return 4;
-            }
-
-            @Override
-            public int getFinalResultLength(byte[] data, int offset,
-                    AggregateState state) {
-                return 4;
-            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 270ada0..a3c0aa9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -164,26 +164,6 @@
                     state.state = sum;
                 }
             }
-
-            @Override
-            public int getBinaryStateLength(IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) {
-                if (useObjectState)
-                    return 0;
-                return 4;
-            }
-
-            @Override
-            public int getPartialResultLength(byte[] data, int offset,
-                    AggregateState state) {
-                return 4;
-            }
-
-            @Override
-            public int getFinalResultLength(byte[] data, int offset,
-                    AggregateState state) {
-                return 4;
-            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index b8656da..9c6ff68 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -215,80 +215,6 @@
                 return new AggregateState();
             }
 
-            @Override
-            public int getBinaryStateLength(IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) {
-                int len = 0;
-
-                if (hasBinaryState) {
-                    len = 4;
-                }
-
-                return len;
-            }
-
-            @Override
-            public int getPartialResultLength(byte[] data, int offset,
-                    AggregateState state) throws HyracksDataException {
-                int len = 0;
-
-                if (hasBinaryState) {
-                    int stateIdx = IntegerSerializerDeserializer.getInt(data,
-                            offset);
-
-                    Object[] storedState = (Object[]) state.state;
-
-                    len = getUTFLength(((String) (storedState[stateIdx])));
-                } else {
-                    len = getUTFLength((String) (state.state));
-                }
-
-                return len;
-            }
-
-            @Override
-            public int getFinalResultLength(byte[] data, int offset,
-                    AggregateState state) throws HyracksDataException {
-                int len = 0;
-
-                if (hasBinaryState) {
-                    int stateIdx = IntegerSerializerDeserializer.getInt(data,
-                            offset);
-
-                    Object[] storedState = (Object[]) state.state;
-
-                    len = getUTFLength(((String) (storedState[stateIdx])));
-                } else {
-                    len = getUTFLength((String) (state.state));
-                }
-
-                return len;
-            }
-
-            private int getUTFLength(String str) throws HyracksDataException {
-                int utflen = 0;
-                int c = 0;
-
-                /* use charAt instead of copying String to char array */
-                for (int i = 0; i < str.length(); i++) {
-                    c = str.charAt(i);
-                    if ((c >= 0x0001) && (c <= 0x007F)) {
-                        utflen++;
-                    } else if (c > 0x07FF) {
-                        utflen += 3;
-                    } else {
-                        utflen += 2;
-                    }
-                }
-
-                if (utflen > 65535) {
-                    throw new HyracksDataException("encoded string too long: "
-                            + utflen + " bytes");
-                }
-
-                return utflen + 2;
-            }
-
         };
     }
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index c1d67fe..81eb45f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.FrameToolsForGroupers;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
@@ -74,18 +73,6 @@
             this.keys = keyFields;
         }
         
-        int stateTupleFieldCount = keys.length;
-        for (int i = 0; i < aggregators.length; i++) {
-            if (aggregators[i].needsBinaryState()) {
-                stateTupleFieldCount++;
-            }
-        }
-
-        final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(
-                stateTupleFieldCount);
-
-        final ArrayTupleBuilder resultTupleBuilder = new ArrayTupleBuilder(
-                outRecordDescriptor.getFields().length);
 
         return new IAggregatorDescriptor() {
 
@@ -97,16 +84,16 @@
             }
 
             @Override
-            public void outputPartialResult(byte[] buf, int offset,
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
 
-                resultTupleBuilder.reset();
+                tupleBuilder.reset();
                 for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                    resultTupleBuilder.addField(accessor, tIndex,
+                    tupleBuilder.addField(accessor, tIndex,
                             keyFieldsInPartialResults[i]);
                 }
-                DataOutput dos = resultTupleBuilder.getDataOutput();
+                DataOutput dos = tupleBuilder.getDataOutput();
 
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 for (int i = 0; i < aggregators.length; i++) {
@@ -117,29 +104,24 @@
                             fieldOffset + accessor.getFieldSlotsLength()
                                     + tupleOffset, ((AggregateState[]) state
                                     .state)[i]);
-                    resultTupleBuilder.addFieldEndOffset();
+                    tupleBuilder.addFieldEndOffset();
                 }
 
-                if (buf != null)
-                    FrameToolsForGroupers.writeFields(buf, offset, this
-                            .getPartialOutputLength(accessor, tIndex, state),
-                            resultTupleBuilder);
-
             }
 
             @Override
-            public void outputFinalResult(byte[] buf, int offset,
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
 
-                resultTupleBuilder.reset();
+                tupleBuilder.reset();
 
                 for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                    resultTupleBuilder.addField(accessor, tIndex,
+                    tupleBuilder.addField(accessor, tIndex,
                             keyFieldsInPartialResults[i]);
                 }
 
-                DataOutput dos = resultTupleBuilder.getDataOutput();
+                DataOutput dos = tupleBuilder.getDataOutput();
 
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 for (int i = 0; i < aggregators.length; i++) {
@@ -155,99 +137,28 @@
                         aggregators[i].outputFinalResult(dos, null, 0,
                                 ((AggregateState[]) state.state)[i]);
                     }
-                    resultTupleBuilder.addFieldEndOffset();
+                    tupleBuilder.addFieldEndOffset();
                 }
-
-                if (buf != null)
-                    FrameToolsForGroupers.writeFields(buf, offset,
-                            this.getFinalOutputLength(accessor, tIndex, state),
-                            resultTupleBuilder);
             }
 
             @Override
-            public int getPartialOutputLength(IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state)
-                    throws HyracksDataException {
-                int stateLength = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-
-                for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                    stateLength += accessor.getFieldLength(tIndex,
-                            keyFieldsInPartialResults[i]);
-                    // add length for slot offset
-                    stateLength += 4;
-                }
-
-                for (int i = 0; i < aggregators.length; i++) {
-                    int fieldOffset = 0;
-                    if (aggregators[i].needsBinaryState()) {
-                        fieldOffset = accessor.getFieldStartOffset(tIndex,
-                                keys.length + i);
-                    }
-                    stateLength += aggregators[i].getPartialResultLength(
-                            accessor.getBuffer().array(), tupleOffset
-                                    + accessor.getFieldSlotsLength()
-                                    + fieldOffset,
-                            ((AggregateState[]) state.state)[i]);
-                    // add length for slot offset
-                    stateLength += 4;
-                }
-                return stateLength;
-            }
-
-            @Override
-            public int getFinalOutputLength(IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state)
-                    throws HyracksDataException {
-                int stateLength = 0;
-                int tupleOffset = accessor.getTupleStartOffset(tIndex);
-
-                for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                    stateLength += accessor.getFieldLength(tIndex,
-                            keyFieldsInPartialResults[i]);
-                    // add length for slot offset
-                    stateLength += 4;
-                }
-
-                for (int i = 0; i < aggregators.length; i++) {
-                    int fieldOffset = 0;
-                    if (aggregators[i].needsBinaryState()) {
-                        fieldOffset = accessor.getFieldStartOffset(tIndex,
-                                keys.length + i);
-                    }
-                    stateLength += aggregators[i].getFinalResultLength(accessor
-                            .getBuffer().array(),
-                            tupleOffset + accessor.getFieldSlotsLength()
-                                    + fieldOffset, ((AggregateState[]) state
-                                    .state)[i]);
-                    // add length for slot offset
-                    stateLength += 4;
-                }
-                return stateLength;
-            }
-
-            @Override
-            public void init(byte[] buf, int offset,
+            public void init(ArrayTupleBuilder tupleBuilder,
                     IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-
-                stateTupleBuilder.reset();
+                
+                tupleBuilder.reset();
                 for (int i = 0; i < keys.length; i++) {
-                    stateTupleBuilder.addField(accessor, tIndex, keys[i]);
+                    tupleBuilder.addField(accessor, tIndex, keys[i]);
                 }
-                DataOutput dos = stateTupleBuilder.getDataOutput();
+                DataOutput dos = tupleBuilder.getDataOutput();
 
                 for (int i = 0; i < aggregators.length; i++) {
                     aggregators[i].init(accessor, tIndex, dos,
                             ((AggregateState[]) state.state)[i]);
                     if (aggregators[i].needsBinaryState()) {
-                        stateTupleBuilder.addFieldEndOffset();
+                        tupleBuilder.addFieldEndOffset();
                     }
                 }
-                if (buf != null)
-                    FrameToolsForGroupers.writeFields(buf, offset, this
-                            .getBinaryAggregateStateLength(accessor, tIndex,
-                                    state), stateTupleBuilder);
             }
 
             @Override
@@ -260,31 +171,6 @@
             }
 
             @Override
-            public int getBinaryAggregateStateLength(
-                    IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
-                int stateLength = 0;
-
-                for (int i = 0; i < keys.length; i++) {
-                    stateLength += accessor
-                            .getFieldLength(tIndex, keys[i]);
-                    // add length for slot offset
-                    stateLength += 4;
-                }
-
-                for (int i = 0; i < aggregators.length; i++) {
-                    if (aggregators[i].needsBinaryState()) {
-                        stateLength += aggregators[i].getBinaryStateLength(
-                                accessor, tIndex,
-                                ((AggregateState[]) state.state)[i]);
-                        // add length for slot offset
-                        stateLength += 4;
-                    }
-                }
-                return stateLength;
-            }
-
-            @Override
             public void close() {
                 for (int i = 0; i < aggregators.length; i++) {
                     aggregators[i].close();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 717cea5..042c919 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -59,1131 +59,1252 @@
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 
 /**
  *
  */
 public class AggregationTests extends AbstractIntegrationTest {
 
-	final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-			new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-					 "data/tpch0.001/lineitem.tbl"))) });
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+             "data/tpch0.001/lineitem.tbl"))) });
 
-	final RecordDescriptor desc = new RecordDescriptor(
-			new ISerializerDeserializer[] {
-					UTF8StringSerializerDeserializer.INSTANCE,
-					IntegerSerializerDeserializer.INSTANCE,
-					IntegerSerializerDeserializer.INSTANCE,
-					IntegerSerializerDeserializer.INSTANCE,
-					IntegerSerializerDeserializer.INSTANCE,
-					FloatSerializerDeserializer.INSTANCE,
-					FloatSerializerDeserializer.INSTANCE,
-					FloatSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE });
+    final RecordDescriptor desc = new RecordDescriptor(
+            new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE });
 
-	final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
-			new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-					IntegerParserFactory.INSTANCE,
-					IntegerParserFactory.INSTANCE,
-					IntegerParserFactory.INSTANCE,
-					IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-					FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE,
-					UTF8StringParserFactory.INSTANCE, }, '|');
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE, }, '|');
 
-	private AbstractSingleActivityOperatorDescriptor getPrinter(
-			JobSpecification spec, String prefix) throws IOException {
+    private AbstractSingleActivityOperatorDescriptor getPrinter(
+            JobSpecification spec, String prefix) throws IOException {
 
-		AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
-				spec, new ConstantFileSplitProvider(new FileSplit[] {
-						new FileSplit(NC1_ID, createTempFile()
-								.getAbsolutePath()),
-						new FileSplit(NC2_ID, createTempFile()
-								.getAbsolutePath()) }), "\t");
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+                spec, new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit(NC1_ID, createTempFile()
+                                .getAbsolutePath()),
+                        new FileSplit(NC2_ID, createTempFile()
+                                .getAbsolutePath()) }), "\t");
 
-		return printer;
-	}
+        return printer;
+    }
 
-	@Test
-	public void singleKeySumInmemGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    @Test
+    public void singleKeySumInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
-		int[] keyFields = new int[] { 0 };
-		int tableSize = 8;
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
 
-		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new IntSumFieldAggregatorFactory(3, true) }),
-				outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
+                outputRec, tableSize);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumInmemGroupTest");
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	@Test
-	public void singleKeySumPreClusterGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    @Test
+    public void singleKeySumPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
-		int[] keyFields = new int[] { 0 };
+        int[] keyFields = new int[] { 0 };
+        
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+                spec,
+                4,
+                keyFields,
+                null,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                desc);
 
-		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new IntSumFieldAggregatorFactory(3, true) }),
-				outputRec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        
+        spec.connect(conn0, csvScanner, 0, sorter, 0);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
+                outputRec);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeySumInmemGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, sorter, 0, grouper, 0);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumInmemGroupTest");
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-	@Test
-	public void singleKeySumExtGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+    @Test
+    public void singleKeySumExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		int[] keyFields = new int[] { 0 };
-		int frameLimits = 4;
-		int tableSize = 8;
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new UTF8StringNormalizedKeyComputerFactory(),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new IntSumFieldAggregatorFactory(3, false) }),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new IntSumFieldAggregatorFactory(2, false) }),
-				outputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-						tableSize), true);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 4;
+        int tableSize = 8;
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeySumExtGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumExtGroupTest");
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-	@Test
-	public void singleKeyAvgInmemGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+    @Test
+    public void singleKeyAvgInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		int[] keyFields = new int[] { 0 };
-		int tableSize = 8;
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new CountFieldAggregatorFactory(true),
-								new AvgFieldGroupAggregatorFactory(1, true) }),
-				outputRec, tableSize);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec, tableSize);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeyAvgInmemGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-	@Test
-	public void singleKeyAvgPreClusterGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+    @Test
+    public void singleKeyAvgPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		int[] keyFields = new int[] { 0 };
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new CountFieldAggregatorFactory(true),
-								new AvgFieldGroupAggregatorFactory(1, true) }),
-				outputRec);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        int[] keyFields = new int[] { 0 };
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+                spec,
+                4,
+                keyFields,
+                null,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                desc);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeyAvgInmemGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        
+        spec.connect(conn0, csvScanner, 0, sorter, 0);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-	@Test
-	public void singleKeyAvgExtGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, sorter, 0, grouper, 0);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		int[] keyFields = new int[] { 0 };
-		int frameLimits = 4;
-		int tableSize = 8;
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new UTF8StringNormalizedKeyComputerFactory(),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new CountFieldAggregatorFactory(false),
-								new AvgFieldGroupAggregatorFactory(1, false) }),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new IntSumFieldAggregatorFactory(2, false),
-								new AvgFieldMergeAggregatorFactory(3, false) }),
-				outputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-						tableSize), true);
+    @Test
+    public void singleKeyAvgExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeyAvgExtGroupTest");
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 4;
+        int tableSize = 8;
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new CountFieldAggregatorFactory(false),
+                                new AvgFieldGroupAggregatorFactory(1, false) }),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new AvgFieldMergeAggregatorFactory(3, false) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-	@Test
-	public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgExtGroupTest");
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		int[] keyFields = new int[] { 0 };
-		int tableSize = 8;
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new MinMaxStringFieldAggregatorFactory(15,
-										true, false) }), outputRec, tableSize);
+    @Test
+    public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeyAvgInmemGroupTest");
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec, tableSize);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-	@Test
-	public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		int[] keyFields = new int[] { 0 };
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new MinMaxStringFieldAggregatorFactory(15,
-										true, false) }), outputRec);
+    @Test
+    public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeyAvgInmemGroupTest");
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        int[] keyFields = new int[] { 0 };
+        
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+                spec,
+                4,
+                keyFields,
+                null,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                desc);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+                NC2_ID, NC1_ID);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        
+        spec.connect(conn0, csvScanner, 0, sorter, 0);
 
-	@Test
-	public void singleKeyMinMaxStringExtGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, sorter, 0, grouper, 0);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-		int[] keyFields = new int[] { 0 };
-		int frameLimits = 4;
-		int tableSize = 8;
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				new UTF8StringNormalizedKeyComputerFactory(),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new MinMaxStringFieldAggregatorFactory(15,
-										true, true) }),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new MinMaxStringFieldAggregatorFactory(2, true,
-										true) }),
-				outputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-						tableSize), true);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void singleKeyMinMaxStringExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 4;
+        int tableSize = 8;
+
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, true) }),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(2, true,
+                                        true) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+    @Test
+    public void multiKeySumInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec,
-				new FieldHashPartitionComputerFactory(
-						keyFields,
-						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"singleKeyAvgExtGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        int[] keyFields = new int[] { 8, 0 };
+        int tableSize = 8;
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
+                outputRec, tableSize);
 
-	@Test
-	public void multiKeySumInmemGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeySumInmemGroupTest");
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		int[] keyFields = new int[] { 8, 0 };
-		int tableSize = 8;
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new IntSumFieldAggregatorFactory(3, true) }),
-				outputRec, tableSize);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+    @Test
+    public void multiKeySumPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeySumInmemGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        int[] keyFields = new int[] { 8, 0 };
+        
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+                spec,
+                4,
+                keyFields,
+                null,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE},
+                desc);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+                NC2_ID, NC1_ID);
 
-	@Test
-	public void multiKeySumPreClusterGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
+        
+        spec.connect(conn0, csvScanner, 0, sorter, 0);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec, keyFields, new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
+                outputRec);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, sorter, 0, grouper, 0);
 
-		int[] keyFields = new int[] { 8, 0 };
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeySumInmemGroupTest");
 
-		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-				spec, keyFields, new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new IntSumFieldAggregatorFactory(3, true) }),
-				outputRec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeySumInmemGroupTest");
+    @Test
+    public void multiKeySumExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
-	@Test
-	public void multiKeySumExtGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        int[] keyFields = new int[] { 8, 0 };
+        int[] keys = new int[] { 0, 1 };
+        int frameLimits = 4;
+        int tableSize = 8;
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(keyFields,
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }),
+                new MultiFieldsAggregatorFactory(keys,
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new IntSumFieldAggregatorFactory(3, false) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE });
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		int[] keyFields = new int[] { 8, 0 };
-		int[] keys = new int[] {0, 1};
-		int frameLimits = 4;
-		int tableSize = 8;
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeySumExtGroupTest");
 
-		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new UTF8StringNormalizedKeyComputerFactory(),
-				new MultiFieldsAggregatorFactory(keyFields,
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new IntSumFieldAggregatorFactory(3, false) }),
-				new MultiFieldsAggregatorFactory(keys,
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(2, false),
-								new IntSumFieldAggregatorFactory(3, false) }),
-				outputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] {
-										UTF8StringBinaryHashFunctionFactory.INSTANCE,
-										UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-						tableSize), true);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeySumExtGroupTest");
+    @Test
+    public void multiKeyAvgInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
-	@Test
-	public void multiKeyAvgInmemGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        int[] keyFields = new int[] { 8, 0 };
+        int tableSize = 8;
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec, tableSize);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		int[] keyFields = new int[] { 8, 0 };
-		int tableSize = 8;
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyAvgInmemGroupTest");
 
-		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new CountFieldAggregatorFactory(true),
-								new AvgFieldGroupAggregatorFactory(1, true) }),
-				outputRec, tableSize);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeyAvgInmemGroupTest");
+    @Test
+    public void multiKeyAvgPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
-	@Test
-	public void multiKeyAvgPreClusterGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        int[] keyFields = new int[] { 8, 0 };
+        
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+                spec,
+                4,
+                keyFields,
+                null,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE},
+                desc);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+                NC2_ID, NC1_ID);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
+        
+        spec.connect(conn0, csvScanner, 0, sorter, 0);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec, keyFields, new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec);
 
-		int[] keyFields = new int[] { 8, 0 };
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-				spec, keyFields, new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new CountFieldAggregatorFactory(true),
-								new AvgFieldGroupAggregatorFactory(1, true) }),
-				outputRec);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, sorter, 0, grouper, 0);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyAvgInmemGroupTest");
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeyAvgInmemGroupTest");
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+    @Test
+    public void multiKeyAvgExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-	@Test
-	public void multiKeyAvgExtGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        int[] keyFields = new int[] { 8, 0 };
+        int frameLimits = 4;
+        int tableSize = 8;
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						FloatSerializerDeserializer.INSTANCE });
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new CountFieldAggregatorFactory(false),
+                                new AvgFieldGroupAggregatorFactory(1, false) }),
+                new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new IntSumFieldAggregatorFactory(3, false),
+                                new AvgFieldMergeAggregatorFactory(4, false) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-		int[] keyFields = new int[] { 8, 0 };
-		int frameLimits = 4;
-		int tableSize = 8;
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new UTF8StringNormalizedKeyComputerFactory(),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new CountFieldAggregatorFactory(false),
-								new AvgFieldGroupAggregatorFactory(1, false) }),
-				new MultiFieldsAggregatorFactory(new int[]{0, 1},
-						new IFieldAggregateDescriptorFactory[] { 
-								new IntSumFieldAggregatorFactory(2, false),
-								new IntSumFieldAggregatorFactory(3, false),
-								new AvgFieldMergeAggregatorFactory(4, false) }),
-				outputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] {
-										UTF8StringBinaryHashFunctionFactory.INSTANCE,
-										UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-						tableSize), true);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyAvgExtGroupTest");
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeyAvgExtGroupTest");
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+    @Test
+    public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-	@Test
-	public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        int[] keyFields = new int[] { 8, 0 };
+        int tableSize = 8;
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec, tableSize);
 
-		int[] keyFields = new int[] { 8, 0 };
-		int tableSize = 8;
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-				new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new MinMaxStringFieldAggregatorFactory(15,
-										true, false) }), outputRec, tableSize);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyMinMaxStringInmemGroupTest");
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeyMinMaxStringInmemGroupTest");
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+    @Test
+    public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-	@Test
-	public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        int[] keyFields = new int[] { 8, 0 };
+        
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+                spec,
+                4,
+                keyFields,
+                null,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE},
+                desc);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+                NC2_ID, NC1_ID);
 
-		int[] keyFields = new int[] { 8, 0 };
+        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
+        
+        spec.connect(conn0, csvScanner, 0, sorter, 0);
 
-		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-				spec, keyFields, new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, true),
-								new MinMaxStringFieldAggregatorFactory(15,
-										true, false) }), outputRec);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec, keyFields, new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, sorter, 0, grouper, 0);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeyMinMaxStringPreClusterGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyMinMaxStringPreClusterGroupTest");
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	@Test
-	public void multiKeyMinMaxStringExtGroupTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    @Test
+    public void multiKeyMinMaxStringExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-				spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-		RecordDescriptor outputRec = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						IntegerSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-		int[] keyFields = new int[] { 8, 0 };
-		int frameLimits = 4;
-		int tableSize = 8;
+        int[] keyFields = new int[] { 8, 0 };
+        int frameLimits = 4;
+        int tableSize = 8;
 
-		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-				spec,
-				keyFields,
-				frameLimits,
-				new IBinaryComparatorFactory[] {
-						UTF8StringBinaryComparatorFactory.INSTANCE,
-						UTF8StringBinaryComparatorFactory.INSTANCE },
-				new UTF8StringNormalizedKeyComputerFactory(),
-				new MultiFieldsAggregatorFactory(
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(1, false),
-								new MinMaxStringFieldAggregatorFactory(15,
-										true, true) }),
-				new MultiFieldsAggregatorFactory(new int[] {0, 1},
-						new IFieldAggregateDescriptorFactory[] {
-								new IntSumFieldAggregatorFactory(2, false),
-								new MinMaxStringFieldAggregatorFactory(3, true,
-										true) }),
-				outputRec,
-				new HashSpillableTableFactory(
-						new FieldHashPartitionComputerFactory(
-								keyFields,
-								new IBinaryHashFunctionFactory[] {
-										UTF8StringBinaryHashFunctionFactory.INSTANCE,
-										UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-						tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, true) }),
+                new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new MinMaxStringFieldAggregatorFactory(3, true,
+                                        true) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-				NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-				spec, new FieldHashPartitionComputerFactory(keyFields,
-						new IBinaryHashFunctionFactory[] {
-								UTF8StringBinaryHashFunctionFactory.INSTANCE,
-								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-		spec.connect(conn1, csvScanner, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-				"multiKeyMinMaxStringExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyMinMaxStringExtGroupTest");
 
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
-		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-		spec.connect(conn2, grouper, 0, printer, 0);
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
 }