Fixed a bug on the merging phase of the external grouper; added a float sum aggregator (with test cases).

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1087 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 41da49a..1cab78d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -401,7 +401,6 @@
                                         setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
                                                 topTuples);
                                 } else {
-                                    closeRun(runIndex, runFileReaders, tupleAccessors);
                                     break;
                                 }
                             }
@@ -565,22 +564,20 @@
                          * If all tuples in the targeting frame have been
                          * checked.
                          */
-                        int frameOffset = runIndex * runFrameLimit;
                         tupleIndices[runIndex] = 0;
-                        currentFrameIndexInRun[runIndex] = frameOffset;
+                        currentFrameIndexInRun[runIndex] = runStart;
                         /**
                          * read in batch
                          */
                         currentRunFrames[runIndex] = 0;
-                        for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
-                            ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
-                            if (runCursors[runIndex].nextFrame(buffer)) {
-                                tupleAccessors[frameOffset].reset(buffer);
-                                if (tupleAccessors[frameOffset].getTupleCount() > 0) {
-                                    existNext = true;
-                                } else {
-                                    throw new IllegalStateException("illegal: empty run file");
-                                }
+                        for (int j = 0; j < runFrameLimit; j++) {
+                            int frameIndex = currentFrameIndexInRun[runIndex]
+                                    + j;
+                            if (runCursors[runIndex].nextFrame(inFrames
+                                    .get(frameIndex))) {
+                                tupleAccessors[frameIndex].reset(inFrames
+                                        .get(frameIndex));
+                                existNext = true;
                                 currentRunFrames[runIndex]++;
                             } else {
                                 break;
@@ -611,7 +608,10 @@
                     if (runCursors[index] != null) {
                         runCursors[index].close();
                         runCursors[index] = null;
-                        tupleAccessor[index] = null;
+                        int frameOffset = index * runFrameLimit;
+                        for (int j = 0; j < runFrameLimit; j++) {
+                            tupleAccessor[frameOffset + j] = null;
+                        }
                     }
                 }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
new file mode 100644
index 0000000..f8b1d74
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class FloatSumFieldAggregatorFactory implements
+        IFieldAggregateDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int aggField;
+
+    private final boolean useObjectState;
+    
+    public FloatSumFieldAggregatorFactory(int aggField, boolean useObjState){
+        this.aggField = aggField;
+        this.useObjectState = useObjState;
+    }
+    
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+     */
+    @Override
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+        return new IFieldAggregateDescriptor() {
+            
+            @Override
+            public void reset() {
+                
+            }
+            
+            @Override
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+                    int offset, AggregateState state) throws HyracksDataException {
+                float sum;
+                if (!useObjectState) {
+                    sum = FloatSerializerDeserializer.getFloat(data, offset);
+                } else {
+                    sum = (Float) state.state;
+                }
+                try {
+                    fieldOutput.writeFloat(sum);
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+            
+            @Override
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+                    int offset, AggregateState state) throws HyracksDataException {
+                float sum;
+                if (!useObjectState) {
+                    sum = FloatSerializerDeserializer.getFloat(data, offset);
+                } else {
+                    sum = (Float) state.state;
+                }
+                try {
+                    fieldOutput.writeFloat(sum);
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+            
+            @Override
+            public boolean needsObjectState() {
+                return useObjectState;
+            }
+            
+            @Override
+            public boolean needsBinaryState() {
+                return !useObjectState;
+            }
+            
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex,
+                    DataOutput fieldOutput, AggregateState state)
+                    throws HyracksDataException {
+                float sum = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+
+                sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+                if (!useObjectState) {
+                    try {
+                        fieldOutput.writeFloat(sum);
+                    } catch (IOException e) {
+                        throw new HyracksDataException("I/O exception when initializing the aggregator.");
+                    }
+                } else {
+                    state.state = sum;
+                }
+            }
+            
+            @Override
+            public AggregateState createState() {
+                return new AggregateState(new Float(0.0));
+            }
+            
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+                
+            }
+            
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+                    byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
+                float sum = 0;
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+                if (!useObjectState) {
+                    ByteBuffer buf = ByteBuffer.wrap(data);
+                    sum += buf.getFloat(offset);
+                    buf.putFloat(offset, sum);
+                } else {
+                    sum += (Float) state.state;
+                    state.state = sum;
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
index 2813a62..53a3f79 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
@@ -57,6 +57,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
@@ -66,34 +67,54 @@
  */
 public class AggregationTest extends AbstractIntegrationTest {
 
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
-            new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                    "data/tpch0.001/lineitem.tbl"))) });
 
-    final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
-            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+    final RecordDescriptor desc = new RecordDescriptor(
+            new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE });
 
-    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-            UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-            UTF8StringParserFactory.INSTANCE, }, '|');
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE, }, '|');
 
-    private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
-            throws IOException {
+    private AbstractSingleActivityOperatorDescriptor getPrinter(
+            JobSpecification spec, String prefix) throws IOException {
 
-        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
-                new ConstantFileSplitProvider(new FileSplit[] {
-                        new FileSplit(NC1_ID, createTempFile().getAbsolutePath()),
-                        new FileSplit(NC2_ID, createTempFile().getAbsolutePath()) }), "\t");
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+                spec, new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit(NC1_ID, createTempFile()
+                                .getAbsolutePath()),
+                        new FileSplit(NC2_ID, createTempFile()
+                                .getAbsolutePath()) }), "\t");
 
         return printer;
     }
@@ -102,38 +123,54 @@
     public void singleKeySumInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true),
+                                new FloatSumFieldAggregatorFactory(5, true) }),
                 outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -146,34 +183,49 @@
     public void singleKeySumPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE});
 
         int[] keyFields = new int[] { 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true),
+                                new FloatSumFieldAggregatorFactory(5, true)}),
                 outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -186,41 +238,64 @@
     public void singleKeySumExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE});
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(2, false) }), outputRec,
-                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false),
+                                new FloatSumFieldAggregatorFactory(5, false)}),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new FloatSumFieldAggregatorFactory(3, false)}),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeySumExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -233,38 +308,54 @@
     public void singleKeyAvgInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
-                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -277,34 +368,49 @@
     public void singleKeyAvgPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
-                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -317,43 +423,64 @@
     public void singleKeyAvgExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
-                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new CountFieldAggregatorFactory(false),
+                                new AvgFieldGroupAggregatorFactory(1, false) }),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
                                 new IntSumFieldAggregatorFactory(2, false),
-                                new AvgFieldMergeAggregatorFactory(3, false) }), outputRec,
-                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+                                new AvgFieldMergeAggregatorFactory(3, false) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -366,38 +493,52 @@
     public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -410,34 +551,47 @@
     public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -450,42 +604,63 @@
     public void singleKeyMinMaxStringExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false),
-                        new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec,
-                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, true) }),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(2, true,
+                                        true) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -498,39 +673,58 @@
     public void multiKeySumInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec, keyFields, new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -543,35 +737,51 @@
     public void multiKeySumPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec, keyFields,
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -584,43 +794,69 @@
     public void multiKeySumExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
-                                new IntSumFieldAggregatorFactory(3, false) }), outputRec,
-                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new IntSumFieldAggregatorFactory(3, false) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        PointableBinaryHashFunctionFactory
+                                                .of(UTF8StringPointable.FACTORY),
+                                        PointableBinaryHashFunctionFactory
+                                                .of(UTF8StringPointable.FACTORY) }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeySumExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -633,40 +869,60 @@
     public void multiKeyAvgInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
-                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec, keyFields, new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -679,36 +935,53 @@
     public void multiKeyAvgPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
-                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec, keyFields,
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new CountFieldAggregatorFactory(true),
+                                new AvgFieldGroupAggregatorFactory(1, true) }),
+                outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -721,46 +994,72 @@
     public void multiKeyAvgExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
-                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new CountFieldAggregatorFactory(false),
+                                new AvgFieldGroupAggregatorFactory(1, false) }),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(2, false),
                                 new IntSumFieldAggregatorFactory(3, false),
-                                new AvgFieldMergeAggregatorFactory(4, false) }), outputRec,
-                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                                new AvgFieldMergeAggregatorFactory(4, false) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        PointableBinaryHashFunctionFactory
+                                                .of(UTF8StringPointable.FACTORY),
+                                        PointableBinaryHashFunctionFactory
+                                                .of(UTF8StringPointable.FACTORY) }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -773,39 +1072,58 @@
     public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec, keyFields, new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyMinMaxStringInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -818,35 +1136,51 @@
     public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+                spec, keyFields,
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, false) }), outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringPreClusterGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyMinMaxStringPreClusterGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -859,44 +1193,71 @@
     public void multiKeyMinMaxStringExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
-                new MultiFieldsAggregatorFactory(new int[] { 0, 1 }, new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(2, false),
-                        new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec,
-                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15,
+                                        true, true) }),
+                new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(2, false),
+                                new MinMaxStringFieldAggregatorFactory(3, true,
+                                        true) }),
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        PointableBinaryHashFunctionFactory
+                                                .of(UTF8StringPointable.FACTORY),
+                                        PointableBinaryHashFunctionFactory
+                                                .of(UTF8StringPointable.FACTORY) }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec, new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory
+                                        .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "multiKeyMinMaxStringExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);