Fixed bugs on key fields arrangement when partial grouping values are stored; added test cases to benchmark the new interface.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@962 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
index f273c37..4cda69e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
@@ -33,6 +33,10 @@
     public void reset() {
         state = null;
     }
+    
+    public void close() {
+        state = null;
+    }
 
     public Object getState() {
         return state;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index 31d406d..2327965 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -274,9 +274,14 @@
                         .createBinaryComparator();
             }
 
+            int[] keyFieldsInPartialResults = new int[keyFields.length];
+            for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+                keyFieldsInPartialResults[i] = i;
+            }
+            
             final IAggregatorDescriptor aggregator = mergerFactory
                     .createAggregator(ctx, recordDescriptors[0],
-                            recordDescriptors[0], keyFields);
+                            recordDescriptors[0], keyFields, keyFieldsInPartialResults);
             final AggregateState aggregateState = aggregator
                     .createAggregateStates();
 
@@ -350,6 +355,7 @@
                         writer.fail();
                         throw new HyracksDataException(e);
                     } finally {
+                        aggregateState.close();
                         writer.close();
                     }
                 }
@@ -421,6 +427,7 @@
                                                 runFileReaders, tupleAccessors,
                                                 topTuples);
                                 } else {
+                                    closeRun(runIndex, runFileReaders, tupleAccessors);
                                     break;
                                 }
                             }
@@ -622,6 +629,7 @@
                     if (runCursors[index] != null) {
                         runCursors[index].close();
                         runCursors[index] = null;
+                        tupleAccessor[index] = null;
                     }
                 }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index f38eeb3..d5edd25 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -108,8 +108,13 @@
         ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
         tpc = tpcf.createPartitioner();
 
+        int[] keyFieldsInPartialResults = new int[fields.length];
+        for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+            keyFieldsInPartialResults[i] = i;
+        }
+        
         this.aggregator = aggregatorFactory.createAggregator(ctx,
-                inRecordDescriptor, outRecordDescriptor, fields);
+                inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
 
         this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
         accumulatorSize = 0;
@@ -217,4 +222,10 @@
             flushFrame(appender, writer);
         }
     }
+    
+    void close() throws HyracksDataException {
+        for(AggregateState aState : aggregateStates){
+            aState.close();
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index d13cc5d..b20197d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -124,9 +124,14 @@
         final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
                 : firstKeyNormalizerFactory.createNormalizedKeyComputer();
 
+        int[] keyFieldsInPartialResults = new int[keyFields.length];
+        for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+            keyFieldsInPartialResults[i] = i;
+        }
+        
         final IAggregatorDescriptor aggregator = aggregateFactory
                 .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
-                        keyFields);
+                        keyFields, keyFieldsInPartialResults);
 
         final AggregateState aggregateState = aggregator
                 .createAggregateStates();
@@ -356,6 +361,7 @@
                 tPointers = null;
                 table.close();
                 frames.clear();
+                aggregateState.close();
             }
 
             /**
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
index 65b3873..a0802c8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
@@ -27,6 +27,6 @@
 
     IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
             RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults) throws HyracksDataException;
     
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
index 116992d..1c1c7a2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
@@ -61,7 +61,7 @@
                 .getInputRecordDescriptor(getOperatorId(), 0);
         final IAggregatorDescriptor aggregator = aggregatorFactory
                 .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
-                        groupFields);
+                        groupFields, groupFields);
         final ByteBuffer copyFrame = ctx.allocateFrame();
         final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
                 ctx.getFrameSize(), inRecordDesc);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
index f8e10ef..db6e4ab 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
@@ -128,6 +128,7 @@
                 FrameUtils.flushFrame(appender.getBuffer(), writer);
             }
         }
+        aggregateState.close();
         writer.close();
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index 6f92cf1..0ae62e5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -55,7 +55,7 @@
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
             RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields)
+            RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
             throws HyracksDataException {
 
         final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
@@ -101,9 +101,9 @@
                     AggregateState state) throws HyracksDataException {
                 if (!outputPending) {
                     resultTupleBuilder.reset();
-                    for (int i = 0; i < keyFields.length; i++) {
+                    for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
                         resultTupleBuilder.addField(accessor, tIndex,
-                                keyFields[i]);
+                        		keyFieldsInPartialResults[i]);
                     }
                     DataOutput dos = resultTupleBuilder.getDataOutput();
 
@@ -137,9 +137,9 @@
                 if (!outputPending) {
                     resultTupleBuilder.reset();
                     
-                    for (int i = 0; i < keyFields.length; i++) {
+                    for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
                         resultTupleBuilder.addField(accessor, tIndex,
-                                keyFields[i]);
+                        		keyFieldsInPartialResults[i]);
                     }
  
                     DataOutput dos = resultTupleBuilder.getDataOutput();
@@ -230,7 +230,9 @@
 
             @Override
             public void close() {
-                // TODO Auto-generated method stub
+                for(int i = 0; i < aggregators.length; i++){
+                    aggregators[i].close();
+                }
             }
 
             @Override
@@ -275,9 +277,9 @@
                     AggregateState state) throws HyracksDataException {
                 if (!initPending) {
                     stateTupleBuilder.reset();
-                    for (int i = 0; i < keyFields.length; i++) {
+                    for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
                         stateTupleBuilder.addField(accessor, tIndex,
-                                keyFields[i]);
+                        		keyFieldsInPartialResults[i]);
                     }
                     DataOutput dos = stateTupleBuilder.getDataOutput();
 
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index d89709f..d8df3a8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -64,562 +64,1124 @@
  */
 public class AggregationTests extends AbstractIntegrationTest {
 
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                    "data/tpch0.001/lineitem.tbl"))) });
+	final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+			new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+					"data/tpch0.001/lineitem.tbl"))) });
 
-    final RecordDescriptor desc = new RecordDescriptor(
-            new ISerializerDeserializer[] {
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE });
+	final RecordDescriptor desc = new RecordDescriptor(
+			new ISerializerDeserializer[] {
+					UTF8StringSerializerDeserializer.INSTANCE,
+					IntegerSerializerDeserializer.INSTANCE,
+					IntegerSerializerDeserializer.INSTANCE,
+					IntegerSerializerDeserializer.INSTANCE,
+					IntegerSerializerDeserializer.INSTANCE,
+					FloatSerializerDeserializer.INSTANCE,
+					FloatSerializerDeserializer.INSTANCE,
+					FloatSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE,
+					UTF8StringSerializerDeserializer.INSTANCE });
 
-    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
-            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE, }, '|');
+	final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+			new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+					IntegerParserFactory.INSTANCE,
+					IntegerParserFactory.INSTANCE,
+					IntegerParserFactory.INSTANCE,
+					IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+					FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE,
+					UTF8StringParserFactory.INSTANCE, }, '|');
 
-    private AbstractSingleActivityOperatorDescriptor getPrinter(
-            JobSpecification spec, String prefix) throws IOException {
+	private AbstractSingleActivityOperatorDescriptor getPrinter(
+			JobSpecification spec, String prefix) throws IOException {
 
-        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
-                spec, new ConstantFileSplitProvider(new FileSplit[] {
-                        new FileSplit(NC1_ID, createTempFile()
-                                .getAbsolutePath()),
-                        new FileSplit(NC2_ID, createTempFile()
-                                .getAbsolutePath()) }), "\t");
+		AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+				spec, new ConstantFileSplitProvider(new FileSplit[] {
+						new FileSplit(NC1_ID, createTempFile()
+								.getAbsolutePath()),
+						new FileSplit(NC2_ID, createTempFile()
+								.getAbsolutePath()) }), "\t");
 
-        return printer;
-    }
+		return printer;
+	}
 
-    @Test
-    public void singleKeySumInmemGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+	@Test
+	public void singleKeySumInmemGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE });
 
-        int[] keyFields = new int[] { 0 };
-        int tableSize = 8;
+		int[] keyFields = new int[] { 0 };
+		int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
-                outputRec, tableSize);
+		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new IntSumFieldAggregatorFactory(3, true) }),
+				outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumInmemGroupTest");
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-    
-    @Test
-    public void singleKeySumPreClusterGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+	@Test
+	public void singleKeySumPreClusterGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+
+		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new IntSumFieldAggregatorFactory(3, true) }),
+				outputRec);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeySumInmemGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void singleKeySumExtGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+		int frameLimits = 4;
+		int tableSize = 8;
+
+		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new UTF8StringNormalizedKeyComputerFactory(),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new IntSumFieldAggregatorFactory(3, false) }),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new IntSumFieldAggregatorFactory(2, false) }),
+				outputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+						tableSize), true);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeySumExtGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void singleKeyAvgInmemGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						FloatSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+		int tableSize = 8;
+
+		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new CountFieldAggregatorFactory(true),
+								new AvgFieldAggregatorFactory(1, true) }),
+				outputRec, tableSize);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeyAvgInmemGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void singleKeyAvgPreClusterGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						FloatSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+
+		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new CountFieldAggregatorFactory(true),
+								new AvgFieldAggregatorFactory(1, true) }),
+				outputRec);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeyAvgInmemGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void singleKeyAvgExtGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						FloatSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+		int frameLimits = 4;
+		int tableSize = 8;
+
+		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new UTF8StringNormalizedKeyComputerFactory(),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new CountFieldAggregatorFactory(false),
+								new AvgFieldAggregatorFactory(1, false) }),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new IntSumFieldAggregatorFactory(2, false),
+								new AvgFieldAggregatorFactory(3, false) }),
+				outputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+						tableSize), true);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeyAvgExtGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+		int tableSize = 8;
+
+		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new MinMaxStringFieldAggregatorFactory(15,
+										true, false) }), outputRec, tableSize);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeyAvgInmemGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+
+		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new MinMaxStringFieldAggregatorFactory(15,
+										true, false) }), outputRec);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeyAvgInmemGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void singleKeyMinMaxStringExtGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 0 };
+		int frameLimits = 4;
+		int tableSize = 8;
+
+		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+				new UTF8StringNormalizedKeyComputerFactory(),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new MinMaxStringFieldAggregatorFactory(15,
+										true, true) }),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new MinMaxStringFieldAggregatorFactory(2, true,
+										true) }),
+				outputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+						tableSize), true);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec,
+				new FieldHashPartitionComputerFactory(
+						keyFields,
+						new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"singleKeyAvgExtGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void multiKeySumInmemGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 8, 0 };
+		int tableSize = 8;
+
+		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+				new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new IntSumFieldAggregatorFactory(3, true) }),
+				outputRec, tableSize);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeySumInmemGroupTest");
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
+
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
+
+		spec.addRoot(printer);
+		runTest(spec);
+	}
+
+	@Test
+	public void multiKeySumPreClusterGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
+
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
+
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE });
+
+		int[] keyFields = new int[] { 8, 0 };
 
-        int[] keyFields = new int[] { 0 };
+		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+				spec, keyFields, new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new IntSumFieldAggregatorFactory(3, true) }),
+				outputRec);
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
-                outputRec);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeySumInmemGroupTest");
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumInmemGroupTest");
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
+	@Test
+	public void multiKeySumExtGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-    @Test
-    public void singleKeySumExtGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE });
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+		int[] keyFields = new int[] { 8, 0 };
+		int frameLimits = 4;
+		int tableSize = 8;
 
-        int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
-        int tableSize = 8;
+		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new UTF8StringNormalizedKeyComputerFactory(),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new IntSumFieldAggregatorFactory(3, false) }),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(2, false),
+								new IntSumFieldAggregatorFactory(3, false) }),
+				outputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] {
+										UTF8StringBinaryHashFunctionFactory.INSTANCE,
+										UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+						tableSize), true);
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false),
-                        new IntSumFieldAggregatorFactory(3, false) }),
-                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false),
-                        new IntSumFieldAggregatorFactory(2, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeySumExtGroupTest");
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumExtGroupTest");
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
+	@Test
+	public void multiKeyAvgInmemGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-    @Test
-    public void singleKeyAvgInmemGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						FloatSerializerDeserializer.INSTANCE });
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+		int[] keyFields = new int[] { 8, 0 };
+		int tableSize = 8;
 
-        int[] keyFields = new int[] { 0 };
-        int tableSize = 8;
+		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+				new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new CountFieldAggregatorFactory(true),
+								new AvgFieldAggregatorFactory(1, true) }),
+				outputRec, tableSize);
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new CountFieldAggregatorFactory(true),
-                        new AvgFieldAggregatorFactory(1, true) }), outputRec, tableSize);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeyAvgInmemGroupTest");
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-    
-    @Test
-    public void singleKeyAvgPreClusterGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+	@Test
+	public void multiKeyAvgPreClusterGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						FloatSerializerDeserializer.INSTANCE });
 
-        int[] keyFields = new int[] { 0 };
+		int[] keyFields = new int[] { 8, 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new CountFieldAggregatorFactory(true),
-                        new AvgFieldAggregatorFactory(1, true) }), outputRec);
+		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+				spec, keyFields, new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new CountFieldAggregatorFactory(true),
+								new AvgFieldAggregatorFactory(1, true) }),
+				outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-    @Test
-    public void singleKeyAvgExtGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+	@Test
+	public void multiKeyAvgExtGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						FloatSerializerDeserializer.INSTANCE });
 
-        int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
-        int tableSize = 8;
+		int[] keyFields = new int[] { 8, 0 };
+		int frameLimits = 4;
+		int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false),
-                        new CountFieldAggregatorFactory(false),
-                        new AvgFieldAggregatorFactory(1, false) }),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false),
-                        new IntSumFieldAggregatorFactory(2, false),
-                        new AvgFieldAggregatorFactory(3, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new UTF8StringNormalizedKeyComputerFactory(),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new CountFieldAggregatorFactory(false),
+								new AvgFieldAggregatorFactory(1, false) }),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(2, false),
+								new IntSumFieldAggregatorFactory(3, false),
+								new AvgFieldAggregatorFactory(4, false) }),
+				outputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] {
+										UTF8StringBinaryHashFunctionFactory.INSTANCE,
+										UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+						tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgExtGroupTest");
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-    @Test
-    public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+	@Test
+	public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE });
 
-        int[] keyFields = new int[] { 0 };
-        int tableSize = 8;
+		int[] keyFields = new int[] { 8, 0 };
+		int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new MinMaxStringFieldAggregatorFactory(15, true, false) }),
-                outputRec, tableSize);
+		HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+				new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new MinMaxStringFieldAggregatorFactory(15,
+										true, false) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeyMinMaxStringInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-    @Test
-    public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+	@Test
+	public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE });
 
-        int[] keyFields = new int[] { 0 };
+		int[] keyFields = new int[] { 8, 0 };
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, true),
-                        new MinMaxStringFieldAggregatorFactory(15, true, false) }),
-                outputRec);
+		PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+				spec, keyFields, new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, true),
+								new MinMaxStringFieldAggregatorFactory(15,
+										true, false) }), outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeyMinMaxStringPreClusterGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
-    
-    @Test
-    public void singleKeyMinMaxStringExtGroupTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
+	@Test
+	public void multiKeyMinMaxStringExtGroupTest() throws Exception {
+		JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+		FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+				spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+				csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+		RecordDescriptor outputRec = new RecordDescriptor(
+				new ISerializerDeserializer[] {
+						UTF8StringSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE,
+						IntegerSerializerDeserializer.INSTANCE,
+						UTF8StringSerializerDeserializer.INSTANCE });
 
-        int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
-        int tableSize = 8;
+		int[] keyFields = new int[] { 8, 0 };
+		int frameLimits = 4;
+		int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false),
-                        new MinMaxStringFieldAggregatorFactory(15, true, true) }),
-                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false),
-                        new MinMaxStringFieldAggregatorFactory(2, true, true) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+		ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+				spec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] {
+						UTF8StringBinaryComparatorFactory.INSTANCE,
+						UTF8StringBinaryComparatorFactory.INSTANCE },
+				new UTF8StringNormalizedKeyComputerFactory(),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(1, false),
+								new MinMaxStringFieldAggregatorFactory(15,
+										true, true) }),
+				new MultiFieldsAggregatorFactory(
+						new IFieldAggregateDescriptorFactory[] {
+								new IntSumFieldAggregatorFactory(2, false),
+								new MinMaxStringFieldAggregatorFactory(3, true,
+										true) }),
+				outputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] {
+										UTF8StringBinaryHashFunctionFactory.INSTANCE,
+										UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+						tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, csvScanner, 0, grouper, 0);
+		IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+				spec, new FieldHashPartitionComputerFactory(keyFields,
+						new IBinaryHashFunctionFactory[] {
+								UTF8StringBinaryHashFunctionFactory.INSTANCE,
+								UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+		spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgExtGroupTest");
+		AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+				"multiKeyMinMaxStringExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+				NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
-        spec.connect(conn2, grouper, 0, printer, 0);
+		IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+		spec.connect(conn2, grouper, 0, printer, 0);
 
-        spec.addRoot(printer);
-        runTest(spec);
-    }
+		spec.addRoot(printer);
+		runTest(spec);
+	}
 
 }
diff --git a/hyracks-examples/text-example/textclient/pom.xml b/hyracks-examples/text-example/textclient/pom.xml
index 2642d31..c6f3832 100644
--- a/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks-examples/text-example/textclient/pom.xml
@@ -41,6 +41,7 @@
         <artifactId>appassembler-maven-plugin</artifactId>
         <executions>
           <execution>
+          <id>textclient</id>
             <configuration>
               <programs>
                 <program>
@@ -56,6 +57,23 @@
               <goal>assemble</goal>
             </goals>
           </execution>
+          <execution>
+          	<id>groupclient</id>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.examples.text.client.ExternalGroupClient</mainClass>
+                  <name>groupclient</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
         </executions>
       </plugin>
       <plugin>
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 68e33cb..61b4ad8 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -42,6 +42,11 @@
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
@@ -101,6 +106,9 @@
 
         @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
         public boolean outPlain = true;
+        
+        @Option(name = "-algo", usage = "The algorithm to be used", required = true)
+        public int algo;
     }
 
     /**
@@ -111,14 +119,17 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
+                options.port);
 
         JobSpecification job;
 
-        for (int i = 0; i < 3; i++) {
+        for (int i = 0; i < 6; i++) {
             long start = System.currentTimeMillis();
-            job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i % 2),
-                    options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 3, options.outPlain);
+            job = createJob(parseFileSplits(options.inFileSplits),
+                    parseFileSplits(options.outFileSplits, i),
+                    options.htSize, options.sbSize, options.framesLimit,
+                    options.sortOutput, options.algo, options.outPlain);
 
             System.out.print(i + "\t" + (System.currentTimeMillis() - start));
             start = System.currentTimeMillis();
@@ -136,9 +147,11 @@
             String s = splits[i].trim();
             int idx = s.indexOf(':');
             if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s + " not well formed");
+                throw new IllegalArgumentException("File split " + s
+                        + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
+                    new File(s.substring(idx + 1))));
         }
         return fSplits;
     }
@@ -150,46 +163,70 @@
             String s = splits[i].trim();
             int idx = s.indexOf(':');
             if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s + " not well formed");
+                throw new IllegalArgumentException("File split " + s
+                        + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
-                    + count)));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
+                    new File(s.substring(idx + 1) + "_" + count)));
         }
         return fSplits;
     }
 
-    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
-            int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
+    private static JobSpecification createJob(FileSplit[] inSplits,
+            FileSplit[] outSplits, int htSize, int sbSize, int framesLimit,
+            boolean sortOutput, int alg, boolean outPlain) {
         JobSpecification spec = new JobSpecification();
-        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(
+                inSplits);
 
-        RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor inDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
+        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(
+                spec, splitsProvider, new DelimitedDataTupleParserFactory(
+                        new IValueParserFactory[] {
+                                IntegerParserFactory.INSTANCE,
+                                IntegerParserFactory.INSTANCE,
+                                IntegerParserFactory.INSTANCE,
+                                IntegerParserFactory.INSTANCE,
+                                IntegerParserFactory.INSTANCE,
+                                FloatParserFactory.INSTANCE,
+                                FloatParserFactory.INSTANCE,
+                                FloatParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE, }, '|'),
+                inDesc);
 
         createPartitionConstraint(spec, fileScanner, inSplits);
 
         // Output: each unique string with an integer count
-        RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE,
-                // IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        IntegerSerializerDeserializer.INSTANCE,
+                        // IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
         // Specify the grouping key, which will be the string extracted during
         // the scan.
@@ -200,128 +237,246 @@
         AbstractOperatorDescriptor grouper;
 
         switch (alg) {
-            case 0: // External hash group
-                grouper = new ExternalGroupOperatorDescriptor(
-                        spec,
-                        keys,
-                        framesLimit,
-                        new IBinaryComparatorFactory[] {
-                        // IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE },
-                        new IntegerNormalizedKeyComputerFactory(),
-                        new MultiAggregatorDescriptorFactory(
-                                new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
-                        new MultiAggregatorDescriptorFactory(
-                                new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
-                        outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
-                                new IBinaryHashFunctionFactory[] {
-                                // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
+        case 0: // External hash group
+            grouper = new ExternalGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    framesLimit,
+                    new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE },
+                    new IntegerNormalizedKeyComputerFactory(),
+                    new MultiAggregatorDescriptorFactory(
+                            new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+                    new MultiAggregatorDescriptorFactory(
+                            new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
+                                    keys.length) }),
+                    outDesc,
+                    new HashSpillableGroupingTableFactory(
+                            new FieldHashPartitionComputerFactory(keys,
+                                    new IBinaryHashFunctionFactory[] {
+                                    // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                    IntegerBinaryHashFunctionFactory.INSTANCE }),
+                            htSize), false);
 
-                createPartitionConstraint(spec, grouper, outSplits);
+            createPartitionConstraint(spec, grouper, outSplits);
 
-                // Connect scanner with the grouper
-                IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }));
-                spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
-                break;
-            case 1: // External sort + pre-cluster
-                ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
-                        new IBinaryComparatorFactory[] {
-                        // IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
-                createPartitionConstraint(spec, sorter, inSplits);
+            // Connect scanner with the grouper
+            IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(
+                    spec, new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
+            break;
+        case 1: // External sort + pre-cluster
+            ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+                    spec, framesLimit, keys, new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
+            createPartitionConstraint(spec, sorter, inSplits);
 
-                // Connect scan operator with the sorter
-                IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }));
-                spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
+            // Connect scan operator with the sorter
+            IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(
+                    spec, new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
 
-                grouper = new PreclusteredGroupOperatorDescriptor(spec, keys, new IBinaryComparatorFactory[] {
-                // IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                                new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc);
+            grouper = new PreclusteredGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE },
+                    new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    outDesc);
 
-                createPartitionConstraint(spec, grouper, outSplits);
+            createPartitionConstraint(spec, grouper, outSplits);
 
-                // Connect sorter with the pre-cluster
-                OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
-                spec.connect(sortGroupConn, sorter, 0, grouper, 0);
-                break;
-            case 2: // In-memory hash group
-                grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
-                        new IBinaryHashFunctionFactory[] {
-                        // IntegerBinaryHashFunctionFactory.INSTANCE,
-                        IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
-                // IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
-                                new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
-                        htSize);
+            // Connect sorter with the pre-cluster
+            OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(
+                    spec);
+            spec.connect(sortGroupConn, sorter, 0, grouper, 0);
+            break;
+        case 2: // In-memory hash group
+            grouper = new HashGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }),
+                    new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE },
+                    new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    outDesc, htSize);
 
-                createPartitionConstraint(spec, grouper, outSplits);
+            createPartitionConstraint(spec, grouper, outSplits);
 
-                // Connect scanner with the grouper
-                IConnectorDescriptor scanConn = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }));
-                spec.connect(scanConn, fileScanner, 0, grouper, 0);
-                break;
-            default:
-                grouper = new ExternalGroupOperatorDescriptor(
-                        spec,
-                        keys,
-                        framesLimit,
-                        new IBinaryComparatorFactory[] {
-                        // IntegerBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE },
-                        new IntegerNormalizedKeyComputerFactory(),
-                        new MultiAggregatorDescriptorFactory(
-                                new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
-                        new MultiAggregatorDescriptorFactory(
-                                new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
-                        outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
-                                new IBinaryHashFunctionFactory[] {
-                                // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
+            // Connect scanner with the grouper
+            IConnectorDescriptor scanConn = new MToNPartitioningConnectorDescriptor(
+                    spec, new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanConn, fileScanner, 0, grouper, 0);
+            break;
+        case 3: // new external hash graph
+            grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.ExternalGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    framesLimit,
+                    new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE },
+                    new IntegerNormalizedKeyComputerFactory(),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(
+                                    false) }),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(
+                                    keys.length, false) }), outDesc,
+                    new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keys,
+                                    new IBinaryHashFunctionFactory[] {
+                                    // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                    IntegerBinaryHashFunctionFactory.INSTANCE }),
+                            htSize), false);
+            
+            createPartitionConstraint(spec, grouper, outSplits);
 
-                createPartitionConstraint(spec, grouper, outSplits);
+            // Connect scanner with the grouper
+            IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(
+                    spec, new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
+            
+            break;
+        case 4: // External-sort + new-precluster
+            ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(
+                    spec, framesLimit, keys, new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
+            createPartitionConstraint(spec, sorter2, inSplits);
 
-                // Connect scanner with the grouper
-                IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
-                        // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                IntegerBinaryHashFunctionFactory.INSTANCE }));
-                spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+            // Connect scan operator with the sorter
+            IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(
+                    spec, new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
+
+            grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE },
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                    outDesc);
+
+            createPartitionConstraint(spec, grouper, outSplits);
+
+            // Connect sorter with the pre-cluster
+            OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(
+                    spec);
+            spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
+            break;
+        case 5: // Inmem
+            grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }),
+                    new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE },
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                    outDesc, htSize);
+
+            createPartitionConstraint(spec, grouper, outSplits);
+
+            // Connect scanner with the grouper
+            IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(
+                    spec, new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanConn2, fileScanner, 0, grouper, 0);
+            break;
+        default:
+            grouper = new ExternalGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    framesLimit,
+                    new IBinaryComparatorFactory[] {
+                    // IntegerBinaryComparatorFactory.INSTANCE,
+                    IntegerBinaryComparatorFactory.INSTANCE },
+                    new IntegerNormalizedKeyComputerFactory(),
+                    new MultiAggregatorDescriptorFactory(
+                            new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+                    new MultiAggregatorDescriptorFactory(
+                            new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
+                                    keys.length) }),
+                    outDesc,
+                    new HashSpillableGroupingTableFactory(
+                            new FieldHashPartitionComputerFactory(keys,
+                                    new IBinaryHashFunctionFactory[] {
+                                    // IntegerBinaryHashFunctionFactory.INSTANCE,
+                                    IntegerBinaryHashFunctionFactory.INSTANCE }),
+                            htSize), false);
+
+            createPartitionConstraint(spec, grouper, outSplits);
+
+            // Connect scanner with the grouper
+            IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(
+                    spec, new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] {
+                            // IntegerBinaryHashFunctionFactory.INSTANCE,
+                            IntegerBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
         }
 
-        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
+                outSplits);
 
         AbstractSingleActivityOperatorDescriptor writer;
 
         if (outPlain)
-            writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+            writer = new PlainFileWriterOperatorDescriptor(spec,
+                    outSplitProvider, "|");
         else
-            writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+            writer = new FrameFileWriterOperatorDescriptor(spec,
+                    outSplitProvider);
 
         createPartitionConstraint(spec, writer, outSplits);
 
-        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(
+                spec);
         spec.connect(groupOutConn, grouper, 0, writer, 0);
 
         spec.addRoot(writer);
         return spec;
     }
 
-    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+    private static void createPartitionConstraint(JobSpecification spec,
+            IOperatorDescriptor op, FileSplit[] splits) {
         String[] parts = new String[splits.length];
         for (int i = 0; i < splits.length; ++i) {
             parts[i] = splits[i].getNodeName();
         }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+        PartitionConstraintHelper
+                .addAbsoluteLocationConstraint(spec, op, parts);
     }
 }
\ No newline at end of file